线程池主要解决两个问题:
一是当执行大量异步任务时线程池能够提供较好的性能。
在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁是需要开销的。
线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。
二是线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。
每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,程序员可以使用更方便的Executors的工厂方法,比如newCachedThreadPool(线程池线程个数最多可达Integer.MAX_VALUE,线程自动回收)、newFixedThreadPool(固定大小的线程池)和newSingleThreadExecutor(单个线程)等来创建线程池,当然用户还可以自定义。
Executors其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。
ThreadPoolExecutor继承了AbstractExecutorService,成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程个数,类似于ReentrantReadWriteLock使用一个变量来保存两种信息。
这里假设Integer类型是32位二进制表示,则其中高3位用来表示线程池状态,后面29位用来记录线程池线程个数。
线程池状态:
线程池状态含义如下
线程池状态转换列举如下
线程池参数如下
线程池类型如下
newFixedThreadPool:创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
newSingleThreadExecutor:创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
newCachedThreadPool:创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。
keeyAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。
如上ThreadPoolExecutor类图所示,其中mainLock是独占锁,用来控制新增Worker线程操作的原子性。
termination是该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程。
Worker继承AQS和Runnable接口,是具体承载任务的对象。
Worker继承了AQS,自己实现了简单不可重入独占锁,其中state=0表示锁未被获取状态,state=1表示锁已经被获取的状态,state=-1是创建Worker时默认的状态,创建时状态设置为-1是为了避免该线程在运行runWorker()方法前被中断,下面会具体讲解。
其中变量firstTask记录该工作线程执行的第一个任务,thread是具体执行任务的线程。
DefaultThreadFactory是线程工厂,newThread方法是对线程的一个修饰。其中poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。
execute方法的作用是提交任务command到线程池进行执行。
用户线程提交任务到线程池的模型图如图所示。
ThreadPoolExecutor的实现实际是一个生产消费模型,当用户添加任务到线程池时相当于生产者生产元素,workers线程工作集中的线程直接执行任务或者从任务队列里面获取任务时则相当于消费者消费元素。
用户线程提交任务的execute方法的具体代码如下。
下面分析下新增线程的addWorkder方法,代码如下。
代码比较长,主要分两个部分:
第一部分双重循环的目的是通过CAS操作增加线程数;
第二部分主要是把并发安全的任务添加到workers里面,并且启动任务执行。
首先来分析第一部分的代码(6)。
内层循环的作用是使用CAS操作增加线程数,代码(7.1)判断如果线程个数超限则返回false,否则执行代码(7.2)CAS操作设置线程个数,CAS成功则退出双循环,CAS失败则执行代码(7.3)看当前线程池的状态是否变化了,如果变了,则再次进入外层循环重新获取线程池状态,否则进入内层循环继续进行CAS尝试。
执行到第二部分的代码(8)时说明使用CAS成功地增加了线程个数,但是现在任务还没开始执行。
这里使用全局的独占锁来控制把新增的Worker添加到工作集workers中。
代码(8.1)创建了一个工作线程Worker。
代码(8.2)获取了独占锁,代码(8.3)重新检查线程池状态,这是为了避免在获取锁前其他线程调用了shutdown关闭了线程池。
如果线程池已经被关闭,则释放锁,新增线程失败,否则执行代码(8.4)添加工作线程到线程工作集,然后释放锁。代码(8.5)判断如果新增工作线程成功,则启动工作线程。
用户线程提交任务到线程池后,由Worker来执行。
先看下Worker的构造函数。
在构造函数内首先设置Worker的状态为-1,这是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程)。
这里设置了线程的状态为-1,所以该线程就不会被中断了。
在如下runWorker代码中,运行代码(9)时会调用unlock方法,该方法把status设置为了0,所以这时候调用shutdownNow会中断Worker线程。
代码(11)执行清理任务,其代码如下。
调用shutdown方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是要执行的。该方法会立刻返回,并不等待队列任务完成再返回。
其中代码(13)的内容如下,如果当前线程池状态>=SHUTDOWN则直接返回,否则设置为SHUTDOWN状态。
调用shutdownNow方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断,该方法会立刻返回,并不等待激活的任务执行完成。
返回值为这时候队列里面被丢弃的任务列表。
当线程调用awaitTermination方法后,当前线程会被阻塞,直到线程池状态变为TERMINATED才返回,或者等待时间超时才返回。整个过程中独占锁的代码如下。
如上代码首先获取独占锁,然后在无限循环内部判断当前线程池状态是否至少是TERMINATED状态,如果是则直接返回,否则说明当前线程池里面还有线程在执行,则看设置的超时时间nanos是否小于0,小于0则说明不需要等待,那就直接返回,如果大于0则调用条件变量termination的awaitNanos方法等待nanos时间,期望在这段时间内线程池状态变为TERMINATED。
在讲解shutdown方法时提到过,当线程池状态变为TERMINATED时,会调用termination.signalAll()用来激活调用条件变量termination的await系列方法被阻塞的所有线程,所以如果在调用awaitTermination之后又调用了shutdown方法,并且在shutdown内部将线程池状态设置为TERMINATED,则termination.awaitNanos方法会返回。
另外在工作线程Worker的runWorker方法内,当工作线程运行结束后,会调用processWorkerExit方法,在processWorkerExit方法内部也会调用tryTerminate方法测试当前是否应该把线程池状态设置为TERMINATED,如果是,则也会调用termination.signalAll()用来激活调用线程池的awaitTermination方法而被阻塞的线程。
而且当等待时间超时后,termination.awaitNanos也会返回,这时候会重新检查当前线程池状态是否为TERMINATED,如果是则直接返回,否则继续阻塞挂起自己。