线程(Thread)是计算机中能够执行独立任务的最小单元。线程是进程内的一个执行单位,一个进程可以包含多个线程
多线程是指在一个程序中同时执行多个线程的并发编程模型。在多线程模型中,程序被设计为可以同时执行多个任务,每个任务运行在独立的线程中
线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。这里的任务就是实现了Runnable 或 Callable 接口的实例对象;
减少线程的创建。
使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行压力;
1、线程和任务分离,提升线程重用性;
2、控制线程并发数量,降低服务器压力,统一管理所有线程;
3、提升系统响应速度,
假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了 T1 和 T3 的时间;
线程的重用性的解释:
如果我们创建一个线程,然后给它一个任务,那么这个线程执行完之后,就会销毁掉,因为无法让这个线程再执行其他任务,因为这个线程跟这个任务是绑定的,是耦合的,只能做这个任务。
如果线程要重复利用,那么就需要用到线程池,我们直接把任务给线程池,具体由哪个线程来执行,是线程池内部控制的。
1、并发处理:线程可以实现程序的并发执行,提高程序的运行效率。在服务器端开发中,可以使用多线程来处理多个客户端的请求,提高服务器的并发处理能力
2、云盘文件上传和下载,可以开多线程。
3、多线程批量发送短信邮件
4、框架内部很多都使用多线程提高效率(比如 RocketMQ的多个消息队列)
文件多线程下载解释:
多线程批量发送短信邮件解释:
比如过节,某公司给众多用户发送节假日的短信祝福,如果是单线程,用户量又大,结果单线程的情况下,短信发到第二天才发完,这就会搞出乌龙。用多线程批量发送能提高效率。
ThreadPoolExecutor 是 Java 提供的一个线程池实现类,用于管理和调度线程的执行。
表示线程池中的核心线程的数量,也可以称为可闲置的线程数量,
默认情况下,这个线程是不会被回收的,但是也可以通过设置进行回收。
注意点:设置2个核心线程,并不是说先创建出来的那2个线程就是核心线程。
假如有很多任务提交到线程池,线程池创建出线程1和线程2来执行任务,后面阻塞队列满了,根据需要又创建了线程3和线程4,后面当把阻塞队列的任务都执行完了,剩下自身线程的任务在执行,如果线程2和线程3先执行完了,在等待60秒都没有新的任务过来,那么线程2和线程3就会被回收销毁,剩下线程1和线程4。
并不是先创建的线程就是核心线程,而是最后做完任务的那两个线程,因为剩最后两个线程不会被回收,所以也可以当成是核心线程来说。
当任务比较繁忙,核心线程数在处理任务,其他任务会先被放入阻塞队列中,当阻塞队列任务放满之后,这时候会创建新的线程,创建的线程最多只能达到设置的最大线程数。
在任务繁忙时,会创建非核心的线程,创建的数量不会超过我们设置的最大线程数。然后这些线程在执行完阻塞队列中的任务后,如果在我们设置的等待时间内,没有新的任务需要处理,那么这些非核心线程就会被销毁掉。
非核心线程没事干,最多能等待的时间,就是这个非核心线程存活时间。
当核心线程都在处理任务的情况下,新加入的任务会被放到这个阻塞队列中,等待被线程获取。
这个队列的特点是先进先出。
既然是线程池,那自然少不了线程,线程该如何来创建呢?这个任务就交给了线程工厂 ThreadFactory 来完成。
就是线程池内部使用的线程,就是由这个工厂创建出来的。
当阻塞队列存放的任务满了,线程数也达到了最大线程数,对新加入的任务执行拒绝策略
策略1:AbortPolicy(默认策略)
丢弃任务 并抛出 RejectedExecutionException 异常。
策略2:DiscardPolicy
也是丢弃任务,但是不抛出异常。
策略3:DiscardOldestPolicy
丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
策略4:CallerRunsPolicy
由调用线程处理该任务
创建一个线程池,把任务丢给线程池,线程池就会开线程去执行任务。
如图:创建一个线程池,然后执行10个任务。
执行逻辑是:
当前有2个核心线程,这两个核心线程在处理任务时,
如果还有任务过来,这时线程池是不会创建线程的,是先把任务丢到这个缓存任务的阻塞队列里面去等待执行的。
如果这个阻塞队列满了,线程池才会继续创建线程来执行任务。
如果开的线程达到设置的最大线程数了,这时还有任务过来,线程池处理不过来,就会报错:RejectedExecutionException 拒绝执行的异常
线程池的 execute()方法 用于向线程池提交一个任务,该任务将由线程池中的线程来执行。
在execute()方法中,通常会传入一个Runnable对象,该对象表示要执行的任务逻辑。
这个线程池使用到的参数: 核心线程数(corePoolSize)、最大线程数 (maximumPoolSize)、非核心线程存活时间 (keepAliveTime)、缓存任务的阻塞队列 (BlockingQueue)
缓存任务的阻塞队列,容量为10,最大线程数是4,10+4=14;
加起来这个线程池能接受的最大的执行任务数量就是 14,
如果执行的任务是15次,**超过线程池的最大容量,**就会报错,触发拒绝策略
如果执行14次任务,在最大限度内,就不会报错。
没用这个参数的时候,创建的线程是默认的。
用这个参数来创建线程,可以自己定义线程的名字或其他的东西等。
按这个代码来画图分析
步骤解析:
1、我们设置的核心线程是2个,这个时候有任务提交到线程池,那么线程池就会创建线程来执行任务。
注意:一开始,没有任务提交到线程池的时候,这个线程池是没有任何线程的。直到提交任务到线程池,线程池才会创建线程来执行任务。
提交第1个任务,那么就创建一个线程1来执行,这时提交第2个任务,如果线程1还没执行完,那么就创建线程2来执行任务。
如果提交第2个任务的时候,线程1已经执行完了,那么该任务就交给线程1 来执行,就不会再创建线程2出来。
2、如果线程1和线程2还在执行任务,这个时候又来第3个任务提交到线程池。如图
线程池就会把任务先丢到阻塞队列里面去,等这两个核心线程有哪个先执行完自己的任务后,再去阻塞队列里面获取任务来执行。
注意:阻塞队列的特点是先进先出。
3、如果这个时候,又有11个任务提交到线程池。那么此时一共有14个任务提交到线程池了,如图
如果线程1和线程2 还在执行任务,然后阻塞队列又存满了,那么线程池就会继续创建线程来执行任务,但是创建的线程是不会超过设置的最大线程数的。
4、如果这个时候,再提交第15个任务到线程池,但是阻塞队列已经存满了,最大线程数4个,也都在执行任务,没有空闲的线程了,达到线程池的最大性能数了,那么就会触发任务的拒绝策略。
5、如果任务都执行完了,非核心线程在60秒内没有拿到其他任务,那么就会被销毁,就只剩下2个核心线程保留下来。
核心线程是一直存在的,不会被销毁的。
核心线程数、最大线程数、非核心线程数空闲存活时间,都是由我们自己设置的。
注意点:设置2个核心线程,并不是说先创建出来的那2个线程就是核心线程。
假如有很多任务提交到线程池,线程池创建出线程1和线程2来执行任务,后面阻塞队列满了,根据需要又创建了线程3和线程4,后面当把阻塞队列的任务都执行完了,剩下自身线程的任务在执行,如果线程2和线程3先执行完了,在等待60秒都没有新的任务过来,那么线程2和线程3就会被回收销毁,剩下线程1和线程4。
并不是先创建的线程就是核心线程,而是最后做完任务的那两个线程,因为剩最后两个线程不会被回收,所以也可以当成是核心线程来说。
策略1:AbortPolicy(默认策略)
丢弃任务 并抛出 RejectedExecutionException 异常。
策略2:DiscardPolicy
也是丢弃任务,但是不抛出异常。
如果,没看到-----执行第【14】个任务,因为这是最后一个任务,直接被丢弃了
策略3:DiscardOldestPolicy
丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)
策略4:CallerRunsPolicy
由调用线程处理该任务
因为是main这个线程调用了线程池,所以这第14个任务,就交由main线程处理;在main线程空闲的时候,就会处理这个任务。
关闭线程池的话,优先考虑这个方法。
线程池的 shutdown()方法 是用于关闭线程池的操作。
关闭线程池:调用这个方法之后,后续添加的任务是不会再被执行了,但是已经加入的任务会继续执行完
如图:在 i==10 之前,已经往线程池加入9个任务了。
调用shutdown()方法后,线程池将不再接受新的任务,并且会等待所有已提交的任务执行完毕。一旦所有任务完成,线程池中的线程将被终止,线程池也将被关闭。
关闭线程池:调用这个方法之后,后续添加的任务是不会再被执行了,但是已经加入的任务,如果还没有开始执行,就不会再去执行了。
停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
shutdownNow() 方法用于立即关闭线程池。与shutdown()方法不同的是,shutdownNow()方法会尝试停止所有正在执行的任务,并且不会等待尚未开始的任务执行完成。
shutdownNow() 方法会尝试通过中断正在执行的线程来停止任务的执行。如果一个线程已经被中断,则会抛出InterruptedException异常。如果线程池中的某个任务无法被中断,则该任务将继续运行直到完成。
关闭线程池的结果。
已经加入线程池的 2 ~ 9 个任务,可能正在执行,或等待执行,
正在执行的任务如果能被终止,那么就会被终止掉,然后抛出InterruptedException异常。
还没有执行但已经加入线程池的任务,也不会执行。
核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定。
例如:一个线程执行一个任务需要0.1秒,1秒就执行10个任务;系统80%的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程。
此时我们就可以设计核心线程数为10;
当然实际情况不可能这么平均,所以我们一般按照二八原则设计即可,即按照80%的情况设计核心线程数,剩下的20%可以利用最大线程数处理;
任务队列长度,也就是设计 阻塞队列 能缓存多少个任务。
任务队列长度一般设计为:
核心线程数 / 单个任务执行时间 *2 即可;
例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200 ;
最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定;
例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么:
最大线程数 = ( 最大任务数 - 任务队列长度 ) * 单个任务执行时间;
即: 最大线程数 = (1000 - 200 ) * 0.1 = 80个;
这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可。
还是以这个代码来展开
先分析线程池的五种状态的特点:
源码:
分析图:
运行状态,该状态下线程池可以接受新的任务,也可以处理阻塞队列中的任务
执行 shutdown 方法可进入 SHUTDOWN 状态
执行 shutdownNow 方法可进入 STOP 状态
待关闭状态,不再接受新的任务,继续处理阻塞队列中的任务
当阻塞队列中的任务为空,并且工作线程数为0时,进入 TIDYING 状态
停止状态,不接收新任务,也不处理阻塞队列中的任务,并且会尝试结束执行中的任务
当工作线程数为0时,进入 TIDYING 状态
整理状态,此时任务都已经执行完毕,并且也没有工作线程
执行 terminated 方法后进入 TERMINATED 状态
终止状态,此时线程池完全终止了,并完成了所有资源的释放
execute() 方法是什么?
execute() 方法 是 线程池类ThreadPoolExecutor 中的一个方法。
通过 线程池对象 调用这个 execute() 方法,然后传入一个参数Runnable对象,Runnable对象就是我们让线程要执行的任务。
execute() 方法里面会进行一些判断,用于执行添加核心线程或非核心线程、以及抛异常或者触发任务拒绝策略等操作。
点进 execute() 方法,看代码的执行逻辑
详细分析execute() 方法源码思路:
下面的就是对这个execute() 方法进行展开分析:
如图:
解释下 ctl 这个变量:ctl 存储了线程池的状态和目前正在工作的线程数量
(这个 AtomicInteger 类保证了这个线程操作数值的安全)
ctl 这一个变量存了多个参数,肯定是使用了二进制的方式来表示。
ctl 这一个变量存了多个参数(线程池的状态和目前正在工作的线程数量 ),肯定是使用了二进制的方式存储。
还是上面的代码,启动后,获取ctl的值,用来分析 线程池的状态和工作线程的数量
可以看到此时 ctl 获取到的值是 -536870912
把这个值转成二进制来看:
( 因为 变量ctl 是int类型,占4个字节,就是32位。就看到32位就好)
1110 0000 0000 0000 0000 0000 0000 0000
从这个二进制值可以看出这个线程池处于正在运行的状态
解释为什么能看出线程池是什么状态:
如图:源码中设置的5种状态的对应的值。
-1 << COUNT_BITS 的意思对 -1 进行算术左移操作,移动的位数是COUNT_BITS(29位)
查下 -1 左移29位后的二进制数
线程池5种状态对应的二进制数:
注意:
因为有5种状态,所以需要用到3个bit。
为什么用到3bit?
如下图:五种状态在源码规定的往左移29位之后,是这样的:
RUNNING --> 111
SHUTDOWN --> 000
STOP --> 001
TIDYING --> 010
TERMINATED --> 011
所以五种状态,用3个bit来组合刚好够用。
1字节(byte) = 8 比特(bit,也可以说 位)
源码的设计是这样的,ctl 的长度是32位,前3位表示线程池的状态,后面的29位,表示正在工作的线程的数量。
所以回到上面的问题解答:
-536870912 转二进制:1110 0000 0000 0000 0000 0000 0000 0000
前3位是111,后面29位全是0。
跟线程池处于Running状态的二进制数一样
表示此时代码中的线程池处于运行的状态,然后工作线程的数量是0个。
====================================================
继续分析execute()方法的源码
这段代码的逻辑,就是上面说过的,通过ctl.get()方法拿到当前线程池的运行状态和工作线程数后,
if (workerCountOf? < corePoolSize) 这段代码用来判断当前线程池的工作线程数量是否小于核心线程数,如果小于核心线程数,那么就添加一个核心线程。
代码解释:
调用 workerCountOf? 方法,这段代码就是拿到c的值,这个值是二进制数,取出它二进制数后面29位,判断有没有工作线程。
c 的值是 -536870912 转二进制:1110 0000 0000 0000 0000 0000 0000 0000
COUNT_MASK = (1 << COUNT_BITS) - 1 : 1 转二进制数后,往左移29位再减1,得出的值是 536870911 ,如下图得出值
COUNT_MASK 的值是 536870911 ,转二进制是 0001 1111 1111 1111 1111 1111 1111 1111
然后 workerCountOf? 方法是要 return c & COUNT_MASK;
就是要拿 c 的二进制值 和 COUNT_MASK 的二进制值进行 & 运算,就是 按位与运算。
根据按位与运算的规则,对应位上的两个数如果都是1,结果就是1,否则是0。如图:
所以 workerCountOf(int c) { return c & COUNT_MASK; } 最后返回的是 0,因为0小于我们设置的核心线程数2,所以会添加一个核心线程
这个 true 表示是添加核心线程。
总结:这个if判断的代码就是用来添加核心线程的。
继续往下判断
这个 isRunning? 条件判断,判断这个 c (就是当前的线程池)状态是否 < SHUTDOWN ,
因为在线程池的五种状态中,SHUTDOWN = 0,比它小的之后 RUNNING ( -1 ) ,
所以主要用来判断当前线程池是否处于 RUNNING运行状态
workQueue.offer(command) 这个条件判断,是往队列中添加任务(如果添加成功,返回true,否则返回false),返回false表示往队列中添加任务失败,就是表示线程池的阻塞队列已经满了
先分析这一部分源码:
retry 标签的用法:
定义在最外层的for循环外面,break 可以根据这个标签的位置跳出最外层的for循环,这是一种写法。
for (; ; ){ }
这种没有条件的for循环,相当于 while(true),一样,一直循环,是死循环。
外循环的判断条件分析:
这个判断:判断当前线程池的运行状态
这个判断就是:当前线程池如果不是 Running 运行状态,那么就返回false,直接结束循环。
就没必要往下执行添加线程的操作了
这个判断:判断是否能添加核心线程或非核心线程
按位与 (&) 算法:
对于两个整数的每一位,如果两个相应位同时为 1,则结果的相应位为 1;否则为 0。
再解析这部分判断:
在调用addWorker() 方法的时候,会传一个布尔值 core 参数,
如果core为true,表示调用这个方法是要创建核心线程;
如果core为false,表示调用这个方法是要创建非核心线程;
然后如图:这个if条件判断:
是拿当前线程池的工作线程数去进行判断,
如果调用addWorker() 方法是要创建核心线程,而当前工作线程数大于核心线程数的话,则直接return结束循环,返回false; 不进行添加核心线程的操作。
如果是要创建非核心线程,而当前工作线程数大于最大线程数的话,则直接return结束循环,返回false; 不进行添加非核心线程的操作。
上面进行判断后,到这里就可以添加线程了。
调用这个compareAndIncrementWorkerCount? 方法,通过 CAS 操作,给 ctl 值进行 +1 的操作,就是在 ctl 的二进制数值的表示工作线程数的后29位那里进行+1操作。
通过 CAS 操作,能够在不断的循环中保证线程是安全的
添加线程数成功,则跳出最外层for循环,
添加失败的话,则继续往下判断。
注意:这里只是给线程池的工作线程数 +1 ,但是还没有真正创建该线程。
上面添加线程数成功,则跳出最外层for循环,
如果添加线程数失败的话,
则重新获取 ctl 的值,判断我们这个线程在执行这个内部for循环期间,有没有其他线程修改了这个线程池的状态,如果被修改成非 Running 运行状态,则直接跳过本次的双层for循环。
从头再开始判断。
判断当前线程池的状态,如果是Running运行状态,则继续往下走;
如果当前线程池的工作线程数小于核心线程数,则可以添加核心线程;
如果当前线程池的工作线程数小于最大线程数,则可以添加非核心线程;达成这条件,继续往下走;
通过 CAS 操作,对当前线程池的工作线程数量进行 +1 操作,如果工作线程+1成功,则跳出这个双层for循环,继续往下走;
如果添加工作线程失败,再分两种情况:
如果当前线程池的状态依然是 Running 运行状态,则只需要一直重复内层for循环,直到 +1 成功,才 break 跳出双层for循环,继续走下去;
如果当前线程池的状态不是 Running 运行状态,被其他线程修改成其他状态了,则 continue 跳过这次双重for循环,重新重头开始双层for循环,直到 +1 成功,才 break 跳出双层for循环,继续走下去;
线程start方法和run方法的关系:
start 方法只是用于启动一个新线程;
start 方法启动了一个新线程执行 run 方法中的任务代码,而直接调用 run 方法只是在当前线程中执行任务代码,没有新线程的效果。
run 方法包含了线程的实际任务代码,在新线程启动后会被自动调用来执行任务
总的来说:
线程池对象executor,调用execute()方法,execute()方法需要一个Runnable 类型的参数,这个参数对象里面有一个run方法,run方法里面封装着一些业务逻辑。
当我们把要执行的业务逻辑封装在Runnable 对象的run方法里面,作为参数传递给execute()方法,线程池就会根据自己的判断,或重用或创建线程,来执行这个Runnable 对象里面的run方法。
这个分析图是为了说明,addWorker方法里面最后新创建的工作线程,在调用start()方法启动线程后,执行的run方法,就是一开始线程池对象调用execute方法时传进去的Runnable对象里面的 run() 方法。(executor.execute(new Runnable(){})
就是执行封装着打印功能的run方法。
如果当前线程池状态是 Running 运行状态,或者是SHUTDOWN待关闭状态且传进来的任务是空的,才能继续往下执行
判断线程是否已经运行起来了:
就是判断此时的线程是否已经被start 启动过了,如果在其他地方被调用start方法启动了,那么这里后续就不能再调用线程的start方法了来执行其他操作了,所以抛个异常
这个workers是一个HashSet集合。
上面已经分析了工作线程的创建和启动。
接下来就是分析工作线程执行封装着业务逻辑的Runnable对象中的run方法的逻辑。
再回顾下这个代码逻辑
根据上面的分析,t.start() 工作线程启动后,就是执行Runnable对象中的run方法,就是执行这个 Worker 对象的run方法,
如图可以看出,worker对象的run方法传进一个this,这个this 就是Worker 对象本身;
而Worker 对象本身又存着 Runnable firstTask;这个成员变量;
而Runnable firstTask;这个成员变量就是一开始executor.execute(new Runnable(){}传进来的那个Runnable对象,而这个对象的run方法就封装着我们写的打印功能。
简单来说,绕来绕去,最终执行的就是执行 封装着我们写的打印功能 的那个run方法。
上面解释的执行这个 Worker 对象的run方法,那么肯定就调用到方法体里面的这个runWorker()方法,接下来就是对这个runWorker方法进行解释:
这个runWorker()方法具体就是在写工作线程的工作执行逻辑
一系列判断后,走到这个 task.run() ;这里,调用到了我们封装业务逻辑的run方法
getTask() 方法: 从阻塞队列里面取任务的方法
源码:
源码解释:
如图:演示下获取该对象的变量task后,再将该变量设置为null,那刚刚获取到的task的引用还有值吗?
答案是有的。
根据上面代码提问:一个demo对象有一个user类成员变量,然后用task接收 demo对象获取user这个成员变量,然后再把demo.user=null 设置为null,那此时的task还有值吗,还是也变成null,
解释:
堆中有 demo 和 user 两个对象,demo.user 变量存储的不是User对象本身,而是指向User对象的引用。
执行 demo.user = null; 时,只是断开了 demo 对象中对 user 成员变量的引用关系,并将其设置为 null,
但是实际的 User 对象本身依然存在,并不会因为 demo.user 的变化而被销毁
因为此时的task 还是引用着User对象,所以task还有值。
如图:解释这部分代码
这个代码没什么需要注意的,正常理解即可
钩子方法:
是一种重要的设计模式,它通过在父类中留出可重写的空方法,为子类提供了定制算法行为的灵活性
task.run(); 调用解释
就是调用我们封装业务逻辑的run方法
while 循环判断:
第一次执行的就是传进来的任务,后面主要就是看线程如何重复获取阻塞队列里面的任务来执行 。
getTask():从阻塞队列里面取任务的方法
这个while 的条件如果都是null,意味着这个线程的整个run方法就执行结束了,那么这个线程就进入销毁阶段了,如果在存活时间内没获取任务,那么该线程如果不是核心线程就会被销毁。
主要来看这个条件判断 (task = getTask()) != null
表示从阻塞队列里面也没取到任务,如果在存活时间内没获取任务,那么该线程如果不是核心线程就会被销毁
先看这部分,如果getTask()方法在阻塞队列中获取任务时,判断到当前线程池不是Running运行时状态,或者阻塞队列为空,那么直接返回null
单纯截图方便看对应的方法
假设当前线程是核心线程或非核心线程两种情况来看处理逻辑:
解释 allowCoreThreadTimeOut : 是否允许核心线程超时,默认是false; 如果把这个标记成true,那么核心线程就可以被销毁掉
因为allowCoreThreadTimeOut默认是false,所以只需要针对 wc > corePoolSize 来假设两种情况:
假设的情况一::假设当前工作线程数是 3 , 核心线程数是 2 ; 那么 timed —> 此时就为true
假设的情况二::假设当前工作线程数是 2 , 核心线程数是 2 ; 那么 timed —> 此时就为false
出现的情况结果如图
这是我创建线程池的数值:
核心线程数为2;最大线程数为4;非核心线程存活时间 60s;缓存任务的阻塞队列,容量为10,每个线程执行时间睡眠100秒,方便分析
演示设置的参数是否如所想的一样执行,看是不是先创建2个核心线程,还有任务就放阻塞队列,阻塞队列满了就继续创建非核心线程,达到最大线程数后还有任务进来,则线程池执行拒绝策略。
演示开始:
演示设置的参数是否如所想的一样执行,看是不是先创建2个核心线程,还有任务就放阻塞队列,阻塞队列满了就继续创建非核心线程,达到最大线程数后还有任务进来,则线程池执行拒绝策略
如果,线程池是按这个顺序在执行
allowCoreThreadTimeOut : 是否允许核心线程超时,默认是false; 如果把这个标记成true,那么核心线程就可以被销毁掉
整个ThreadPoolExecutor 线程池源码 大概就这个逻辑。
这个是几年前的 ThreadPoolExecutor 类思路图,拿来参考下;
此时的 ThreadPoolExecutor 类 的源代码也已经发生一些修改了,但是主要逻辑不变。
package java.util.concurrent;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadPoolExecutor extends AbstractExecutorService
{
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Integer.SIZE 是 32位 ; 32-3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//1 往左移动29位 ,再 -1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
//线程池的五种运行状态,状态以二进制的形式进行存储
private static final int RUNNING = -1 << COUNT_BITS; //运行状态
private static final int SHUTDOWN = 0 << COUNT_BITS; //待关闭状态
private static final int STOP = 1 << COUNT_BITS; //停止状态
private static final int TIDYING = 2 << COUNT_BITS; //整理状态
private static final int TERMINATED = 3 << COUNT_BITS; //终止状态
// Packing and unpacking ctl
private static int runStateOf(int c)
{
//~ 符号代表按位取反操作符,它会翻转操作数的每一位,即对操作数的每一位进行取反操作(0 变为 1,1 变为 0)
return c & ~COUNT_MASK;
}
private static int workerCountOf(int c)
{
//按位与 (&) 算法:对于两个整数的每一位,如果两个相应位同时为 1,则结果的相应位为 1;否则为 0。
return c & COUNT_MASK;
}
private static int ctlOf(int rs, int wc)
{
//| :按位或操作;按位或操作会将每个对应位的值进行逻辑或运算,即若两个位中任意一个位为 1,则结果对应位为 1,否则为 0
return rs | wc;
}
private static boolean runStateLessThan(int c, int s)
{
return c < s;
}
private static boolean runStateAtLeast(int c, int s)
{
return c >= s;
}
private static boolean isRunning(int c)
{
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect)
{
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect)
{
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount()
{
ctl.addAndGet(-1);
}
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
//Worker 对象实现类 Runnable 类 ,所以也是一个Runnable对象
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
//常量
private static final long serialVersionUID = 6138294804551838833L;
//属性
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
//构造器
Worker(Runnable firstTask)
{
setState(-1);
//把传进来的任务类(firstTask)设置到 Worker 对象的成员变量 firstTask 里面
this.firstTask = firstTask;
//调用线程工厂的 newThread 方法,创建一个线程,然后把这个线程设置到 Worker 对象的 thread 变量里面。
//this :在Worker构造器里面,就是指这个Worker 对象
this.thread = getThreadFactory().newThread(this);
}
public void run()
{
runWorker(this);
}
protected boolean isHeldExclusively()
{
return getState() != 0;
}
protected boolean tryAcquire(int unused)
{
if (compareAndSetState(0, 1))
{
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused)
{
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock()
{
acquire(1);
}
public boolean tryLock()
{
return tryAcquire(1);
}
public void unlock()
{
release(1);
}
public boolean isLocked()
{
return isHeldExclusively();
}
void interruptIfStarted()
{
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted())
{
try
{
t.interrupt();
} catch (SecurityException ignore)
{
}
}
}
}
private void advanceRunState(int targetState)
{
// assert targetState == SHUTDOWN || targetState == STOP;
for (; ; )
{
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
final void tryTerminate()
{
for (; ; )
{
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && !workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0)
{ // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0)))
{
try
{
terminated();
} finally
{
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally
{
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void checkShutdownAccess()
{
// assert mainLock.isHeldByCurrentThread();
SecurityManager security = System.getSecurityManager();
if (security != null)
{
security.checkPermission(shutdownPerm);
for (Worker w : workers)
security.checkAccess(w.thread);
}
}
private void interruptWorkers()
{
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}
private void interruptIdleWorkers(boolean onlyOne)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
for (Worker w : workers)
{
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock())
{
try
{
t.interrupt();
} catch (SecurityException ignore)
{
} finally
{
w.unlock();
}
}
if (onlyOne)
break;
}
} finally
{
mainLock.unlock();
}
}
private void interruptIdleWorkers()
{
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
final void reject(Runnable command)
{
handler.rejectedExecution(command, this);
}
void onShutdown()
{
}
private List<Runnable> drainQueue()
{
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
q.drainTo(taskList);
if (!q.isEmpty())
{
for (Runnable r : q.toArray(new Runnable[0]))
{
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
//添加线程的方法
private boolean addWorker(Runnable firstTask, boolean core)
{
//这是一个标签,可以根据标签的位置,帮助 break 跳出for循环,这里是帮助break跳出最外面的循环
retry:
//不断循环,每次循环都获取一次 ctl 的值。里面存着当前线程池的运行状态和工作线程数
for (int c = ctl.get(); ; )
{
//整体判断:判断当前线程池的状态是否 >= SHUTDOWN 且 >= STOP,或者 firstTask这个 Runnable对象不能为空,或者 workQueue 阻塞队列为空。
//简单的说:这个判断就是:当前线程池如果不是 Running 运行状态,那么就返回false,就没必要往下执行添加线程的操作了
//条件1:runStateAtLeast(c, SHUTDOWN):判断线程池当前的状态是否 >= SHUTDOWN 状态
//条件2:(runStateAtLeast(c, STOP):判断线程池当前的状态是否 >= STOP 状态
//条件3:firstTask != null:传进来的 Runnable 对象 不能为空
//条件4:workQueue.isEmpty():如果workQueue队列为空,则返回true
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
//根据判断,线程池都不是运行时状态,就没必要往下执行添加线程的操作了。直接 return false
return false;
//没有条件的for循环,相当于 while(true),是不断循环的
for (; ; )
{
//整体判断:通过传进来的 core 进行判断:
//core 为 true,表示要创建核心线程,则【判断当前线程池的工作线程数是否大于我们一开始设置的核心线程数,大于的话就不能再添加核心线程了】,返回return false,结束循环
//core 为 false,表示要创建非核心线程,则【判断当前线程池的工作线程数是否大于我们一开始设置的最大线程数,大于的话就不能再添加非核心线程了】,返回return false,结束循环
// & 位于运算 优先级 高于 >= 运算
//条件1:workerCountOf(c):当前线程池状态的二进制数,和 COUNT_MASK 变量去进行 & 按位与运算,用于获取:【当前的工作线程数量】
//条件2:(core ? corePoolSize : maximumPoolSize) ====> core == true ,表示核心线程数 ; core == false,表示最大线程数
//通过三元运算,拿核心线程数或最大线程数,和 COUNT_MASK 进行 & 按位与 运算。
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 通过 CAS 操作,给 ctl 值进行 +1 的操作,就是在 ctl 的二进制数值的表示线程数的后29位那里进行+1操作
if (compareAndIncrementWorkerCount(c))
//如果 +1 成功,则跳出外层for循环
//通过这个retry标签的位置,跳出最外面的循环
break retry;
//如果自增+1失败,重新获取 ctl 的值
c = ctl.get();
//如果重新获取的线程池运行状态,和 for 循环外获取的线程池状态不一样,
//(因为在这个线程的内层for循环期间,可能有其它线程调用了 SHUTDOWN 等方法,导致线程池状态已经被改变了。
// 所以需要再判断一次,重新走一遍外层for循环,用于重新判断线程池状态)
if (runStateAtLeast(c, SHUTDOWN))
//经判断,如果线程池不是Running运行状态,则跳出这双层for循环。
continue retry;
}
}
//判断是否开启工作线程
boolean workerStarted = false;
//判断线程是否成功添加
boolean workerAdded = false;
//定义工作线程的变量
Worker w = null;
try
{
//创建Worker工作线程对象,firstTask是 Runnable 类对象,作为任务参数被传进来;
//Worker 对象实现类 Runnable 类 ,所以也是一个Runnable对象
w = new Worker(firstTask);
//取出 Worker 对象中的 thread 属性
final Thread t = w.thread;
//判断 t 线程不为空(自定义线程工厂是有可能返回 null 的)
if (t != null)
{
//进行加锁操作 ,是可重入锁,就是这个线程可以多次获取这个锁的资源
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
//获取当前线程池的状态
int c = ctl.get();
//整体来说:如果当前线程池状态是 Running 运行状态,或者是SHUTDOWN待关闭状态且传进来的任务是空的,才能继续往下执行
//条件1:如果当前线程池的状态 < SHUTDOWN,当前线程池为 Running 运行状态,则为true
//条件2: 当前线程池的状态 < STOP 且传进来的 firstTask == null
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null))
{
//t.getState() 获取当前线程的状态
//Thread.State.NEW 是 Java 中线程的一种状态,表示线程已经被创建,但尚未启动
//判断线程池是否已经运行起来了,如果线程状态是已经被启动了,表示后续不能再调用线程的start方法了,所以抛个异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//Set 集合,存储目前有哪些工作线程对象;用于后续销毁的时候知道有哪些工作线程
workers.add(w);
//标记线程成功添加
workerAdded = true;
//获取集合中工作线程的数量
int s = workers.size();
//largestPoolSize 标记线程池中使用到的最大线程数,用来统计而已。
if (s > largestPoolSize)
//如果此时的工作线程数量超过记载的使用到的最大线程数量,则更新
largestPoolSize = s;
}
} finally
{
//释放锁
mainLock.unlock();
}
//线程添加成功,workerAdded 为 true
if (workerAdded)
{
//启动这个刚添加的工作线程,就是启动 Worker对象 中的 thread 线程
t.start();
//标记新添加的工作线程已经被启动了
workerStarted = true;
}
}
} finally
{
if (!workerStarted)
//启动线程失败后的操作
addWorkerFailed(w);
}
//返回新添加的这个工作线程的启动状态
return workerStarted;
}
private void addWorkerFailed(Worker w)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally
{
mainLock.unlock();
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly)
{
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally
{
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP))
{
if (!completedAbruptly)
{
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
//从阻塞队列里面取任务的方法
private Runnable getTask()
{
//是否超时的标记
boolean timedOut = false;
//不断循环
for (; ; )
{
//获取当前线程池的状态和工作线程数
int c = ctl.get();
//判断线程池的状态(如果是非running状态为true) || 阻塞队列是否为空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty()))
{
//worker 工作线程数量 -1 操作
decrementWorkerCount();
//线程池非Running运行状态或者阻塞队列没有任务为空了,直接返回null
return null;
}
//获取当前线程池中的工作线程数量
int wc = workerCountOf(c);
//allowCoreThreadTimeOut : 是否允许核心线程超时,默认是false; 如果把这个标记成true,那么核心线程就可以被销毁掉
//wc > corePoolSize:当前线程池的工作线程数是否 > 核心线程数
//假设的情况一::假设当前工作线程数是 3 , 核心线程数是 2 ; 那么 timed ===> 此时就为true
//假设的情况二::假设当前工作线程数是 2 , 核心线程数是 2 ; 那么 timed ===> 此时就为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//当前线程池中的工作线程数量 > 最大线程数
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty()))
{
//通过原子性的操作,把工作线程数 -1 ,然后返回null
if (compareAndDecrementWorkerCount(c))
//返回null,跳出for循环
return null;
continue;
}
try
{
//三元运算符,假设的情况一:那么timed ===> true
//假设等待60s都没有获取到元素,那么这个 r == null
Runnable r = timed ?
//假设情况一判断:在设定的keepAliveTime(非核心线程存活时间)之内获取元素,如果能获取到就返回该元素对象,获取不到返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//假设情况二的判断:不会有超时时间,会一直阻塞等待,直到有任务为止,代码走到这里就会一直卡在这里,那么就能保证这个线程就不会被销毁
workQueue.take();
if (r != null)
return r;
//假设的情况一: ,r == null 的话,那么是否超时的标记设置为true
timedOut = true;
} catch (InterruptedException retry)
{
timedOut = false;
}
}
}
//工作线程的工作逻辑
final void runWorker(Worker w)
{
//获取当前执行的线程
Thread wt = Thread.currentThread();
//获取传入的Runnable任务对象
Runnable task = w.firstTask;
//将任务对象设置为空
w.firstTask = null;
//允许中断情况下的解锁
w.unlock();
boolean completedAbruptly = true;
try
{
//判断任务是否为空(一般情况下,第一次传进来的任务task不为空)
//getTask():从阻塞队列里面取任务的方法
//(task = getTask()) != null 表示从阻塞队列里面也没取到任务,如果在存活时间内没获取任务,那么该线程如果不是核心线程就会被销毁
while (task != null || (task = getTask()) != null)
{
//对当前这个任务线程加锁
w.lock();
//判断是否要中断当前线程
//ctl.get() 获取当前线程池的状态和工作线程数 ;Thread.interrupted()判断当前线程是否被中断,中断则返回true
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断线程
wt.interrupt();
try
{
//钩子方法:是一种重要的设计模式,它通过在父类中留出可重写的空方法,为子类提供了定制算法行为的灵活性
beforeExecute(wt, task);
try
{
//执行传入的任务类对象run方法
task.run();
afterExecute(task, null);
} catch (Throwable ex)
{
//捕捉异常
afterExecute(task, ex);
throw ex;
}
} finally
{
task = null;
w.completedTasks++;
//最后进行解锁操作
w.unlock();
}
}
completedAbruptly = false;
} finally
{
processWorkerExit(w, completedAbruptly);
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* executor.execute(new Runnable()------来到这个方法
* 线程池的execute()方法用于向线程池提交一个任务,该任务将由线程池中的线程来执行。
* 在execute()方法中,通常会传入一个Runnable对象,该对象表示要执行的任务逻辑。
* 这个方法就是通过一些判断来添加核心线程或非核心线程等操作。
*/
public void execute(Runnable command)
{
//判断任务是否为空,空的话直接抛异常
if (command == null)
throw new NullPointerException();
//获取到 ctl 的值,ctl 存储了线程池的状态和目前工作线程数
int c = ctl.get();
//意思: 判断如果 目前工作线程数 < 核心线程数,那么就添加一个核心线程
if (workerCountOf(c) < corePoolSize)
{
//增加一个核心线程,这个true表示是要添加核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果 工作线程数 > 核心线程数,则继续判断
//意思:判断如果当前线程池是RUNNING运行状态,且往阻塞队列里面添加任务成功,
//条件1 :判断当前线程池的运行状态,源码是判断 c < SHUTDOWN
//条件2:往队列中添加任务(如果添加成功,返回true,否则返回false)
if (isRunning(c) && workQueue.offer(command))
{
//重新获取ctl的值,作用是为了
int recheck = ctl.get();
//意思:这里有!取反,表示:如果当前线程不是运行状态,且移除刚刚添加到阻塞队列的任务的操作失败,就执行任务拒绝策略
//条件1:如果线程池不处于 RUNNING 运行状态
//条件2:把刚添加的任务移除掉,移除成功返回true,否则返回false
if (!isRunning(recheck) && remove(command))
//执行任务拒绝策略
reject(command);
//如果工作线程为0,继续添加工作线程
else if (workerCountOf(recheck) == 0)
// false 表示添加非核心线程
addWorker(null, false);
}
//任务满了,添加非核心线程,这个 false 表示是要添加非核心线程
//整个条件判断的返回值是表示是否添加成功
else if (!addWorker(command, false))
//添加非核心线程失败,执行拒绝策略
reject(command);
}
public void shutdown()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally
{
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow()
{
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally
{
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown()
{
return runStateAtLeast(ctl.get(), SHUTDOWN);
}
boolean isStopped()
{
return runStateAtLeast(ctl.get(), STOP);
}
public boolean isTerminating()
{
int c = ctl.get();
return runStateAtLeast(c, SHUTDOWN) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated()
{
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException
{
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
while (runStateLessThan(ctl.get(), TERMINATED))
{
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);
}
return true;
} finally
{
mainLock.unlock();
}
}
@Deprecated(since = "9")
protected void finalize()
{
}
public void setThreadFactory(ThreadFactory threadFactory)
{
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
public ThreadFactory getThreadFactory()
{
return threadFactory;
}
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
{
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
public RejectedExecutionHandler getRejectedExecutionHandler()
{
return handler;
}
public void setCorePoolSize(int corePoolSize)
{
if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0)
{
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true))
{
if (workQueue.isEmpty())
break;
}
}
}
public int getCorePoolSize()
{
return corePoolSize;
}
public boolean prestartCoreThread()
{
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
void ensurePrestart()
{
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
public int prestartAllCoreThreads()
{
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
public boolean allowsCoreThreadTimeOut()
{
return allowCoreThreadTimeOut;
}
public void allowCoreThreadTimeOut(boolean value)
{
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut)
{
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
public void setMaximumPoolSize(int maximumPoolSize)
{
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
public int getMaximumPoolSize()
{
return maximumPoolSize;
}
public void setKeepAliveTime(long time, TimeUnit unit)
{
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
public long getKeepAliveTime(TimeUnit unit)
{
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
public BlockingQueue<Runnable> getQueue()
{
return workQueue;
}
public boolean remove(Runnable task)
{
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
public void purge()
{
final BlockingQueue<Runnable> q = workQueue;
try
{
Iterator<Runnable> it = q.iterator();
while (it.hasNext())
{
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough)
{
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
public int getPoolSize()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally
{
mainLock.unlock();
}
}
public int getActiveCount()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally
{
mainLock.unlock();
}
}
public int getLargestPoolSize()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
return largestPoolSize;
} finally
{
mainLock.unlock();
}
}
public long getTaskCount()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
long n = completedTaskCount;
for (Worker w : workers)
{
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally
{
mainLock.unlock();
}
}
public long getCompletedTaskCount()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally
{
mainLock.unlock();
}
}
public String toString()
{
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers)
{
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally
{
mainLock.unlock();
}
int c = ctl.get();
String runState =
isRunning(c) ? "Running" :
runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down";
return super.toString() +
"[" + runState +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
protected void beforeExecute(Thread t, Runnable r)
{
}
protected void afterExecute(Runnable r, Throwable t)
{
}
protected void terminated()
{
}
public static class CallerRunsPolicy implements RejectedExecutionHandler
{
public CallerRunsPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler
{
public AbortPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler
{
public DiscardPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler
{
public DiscardOldestPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
e.getQueue().poll();
e.execute(r);
}
}
}
}