提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
提升–17—线程池–03----ThreadPoolExecutor源码解析
4、提交执行task的过程----execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// worker数量比核心线程数小,直接创建worker执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// worker数量超过核心线程数,任务直接进入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
if (! isRunning(recheck) && remove(command))
reject(command);
// 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
// 这儿有3点需要注意:
// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
// 2. addWorker第2个参数表示是否创建核心线程
// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
else if (!addWorker(command, false))
reject(command);
第一种情况,线程池的状态已经是STOP,TIDYING, TERMINATED,或者是SHUTDOWN且工作队列为空;
第二种情况,工作线程数已经大于最大线程数或当前工作线程已超时,且,还有其他工作线程或任务队列为空
第二种情况
(1)首先也是一个自旋,当allowCoreThreadTimeout(运行空闲核心线程超时) 或 wc>corePoolSize(当前线程数量大于核心线程数量) 时,timed会标识为true,表示需要进行超时判断。
(2)当wc(当前工作者数量)大于 最大线程数 或 空闲线程的空闲时间大于keepAliveTime(timed && timeout),以及wc>1或(workQueue)任务队列为空时,会进入compareAndDecrementWorkerCount方法,对wc的值减1。
(3)当compareAndDecrementWorkerCount方法返回true时,则getTask方法会返回null,终止getTask方法的自旋。这时候回到runWorker方法,就会进入到processWorkerExit方法,进行销毁worker。
compareAndDecrementWorkerCount中操作的是ctl属性:
(1)ctl是中心控制器,一个AtomicInteger类型的整数,通过数字的二进制编码的位进行分段,不同的二进制位段表示有不同的含义。
(2)在ctl中,低29为表示线程池的容量,即线程池最大容量为 536870911 = 000
11111111111111111111111111111。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {//自旋
int c = ctl.get();
int rs = runStateOf(c);
/** 对线程池状态的判断,两种情况会 workerCount-1,并且返回 null
1. 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是
要执行 workQueue 中剩余的任务的)
2. 线程池状态为 stop(shutdownNow()会导致变成 STOP)(此时不用考虑 workQueue
的情况)
**/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;;//返回 null,则当前 worker 线程会退出
}
int wc = workerCountOf(c);
// timed 变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
1. 线程数量超过 maximumPoolSize 可能是线程池在运行时被调用了 setMaximumPoolSize()
被改变了大小,否则已经 addWorker()成功不会超过 maximumPoolSize
2. timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中
获取任务发生了超时.其实就是体现了空闲线程的存活时间
第一次进来时候,timed && timedOut 为false,直接跳过此次if,如果大于核心线程,
则所有线程自旋都会执行下面的workQueue.poll,不需要此线程是核心线程,还是非核心线程。
如果超时获取的为null,则在下次自旋时候,通过原子的减法操作,如果减法失败,则进行下一次自 旋,如果减法成功,则释放线程
**/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// 原子操作,只有当减法成功时候,才会释放线程。
return null;
continue;
}
try {
/**
根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在
keepaliveTime 时间内没有获取到任务,则返回 null.
否则通过 take 方法阻塞式获取队列中的任务
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) //如果拿到的任务不为空,则直接返回给 worker 进行处理
return r;
timedOut = true;// 如果 r==null,说明已经超时了,队列中仍然没有任务,此时设置timedOut=true,在下次自旋的时候进行回收
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试
timedOut = false;
}
}
}