ScheduledThreadPoolExecutor是Java中的一个类,它继承自ThreadPoolExecutor,用于在给定的延迟后运行或定期执行任务。它提供了一种灵活的方式来调度和控制线程池中的线程。
带着下面几个问题来分析线程池:
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,对于线程创建流程都是调用父类的。
在它里面有一个内部类,延时队列,这是需要重点关注的。
首先我们看构造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
可以看到构造方法都是调用了ThreadPoolExecutor的构造方法,设置了最大线程数都是Integer.MAX_VALUE。最大线程数真的是这个值吗?我们继续 向下看
提交任务:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 构建ScheduledFutureTask
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 提交任务
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 这里可以看到,首先将任务入队列,注意这里调用的DelayedWorkQueue
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 启动线程
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 当线程数小于核心线程数 添加一个worker
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
// 当等于0的时候也会添加一个空闲的worker
addWorker(null, false);
}
所以,线程池线程最大个数就是corePoolSize.
线程工作仍然是运行的ThreadPoolExecutor的runWorker方法,所不同的是所有有关调用阻塞队列的方法,都变为DelayedWorkQueue的,下面详细看看这个延时阻塞队列。
承载数据的是数组,数据结构是大根堆,根据延时的时间排序time(The time the task is enabled to execute in nanoTime units)。
// 添加新的元素,上浮
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
// 构建堆,下沉
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
// 扩容
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
兄弟们,堆排序算法核心的两个方法【上浮】和【下沉】在这儿了,值得大家学习。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 到达队列长度,再进行扩容
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
// 如果是第一个元素,直接set
queue[0] = e;
setIndex(e, 0);
} else {
// 否则上浮
siftUp(i, e);
}
// 判断当前元素上浮的位置是不是头节点,如果是头节点
// 这里很重要啊,和take配合使用的,会唤醒available等待队列,并且让leader为null
// 下面看take的时候再详细分析
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空,直接阻塞
if (first == null)
available.await();
else {
// 如果当前获取到的任务时间已经到了,那么执行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
// 当前元素需要等待,去除first引用
first = null; // don't retain ref while waiting
// 如果leader不为null,直接阻塞
if (leader != null)
available.await();
else {
// 1.注意看这里 leader为null,然后让leader指向了自己
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 2. 阻塞延迟的时间
available.awaitNanos(delay);
} finally {
// 3.最终如果leader的指向没有变,那么让leader为null
if (leader == thisThread)
leader = null;
}
}
// 这里着重解读一下leader的用处
// 我认为awaitNanos(delay)对于操作系统来说是消耗资源的,如果直接await那么只需要
// 在那等待唤醒就行,但是awaitNanos是自动醒来的,那么必然一直在轮询时间,消耗资源。
// 我们可以看到第一次leader为null的时候需要设置阻塞时间,后面再进来leader就不为null了,
// 说明leader已经设置时间阻塞,那么后面的干脆就不设置时间阻塞了,等leader唤醒好了,如果后面的delay都比leader的delay长的话,那么直接等leader唤醒,不需要消耗资源
// 问题:如果后面的时间比前面的短怎么办?
// 答:后面的任务在入队的时候会触发signal唤醒leader,然后再去拿queuep[0],所以最终一定会达到leader的delay是最短的
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
// 这其实也就是从堆中取元素,然后维护堆的操作
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
设计的确实非常巧妙。