这是一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。
Executors其实是个工具类,它提供了好多静态方法,可根据用户的选择返回不同的线程池实例。
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。
线程池队列是DelayedWorkQueue,其和DelayedQueue类似,是一个延迟队列。
ScheduledFutureTask是具有返回值的任务,继承自FutureTask。
FutureTask的内部有一个变量state用来表示任务的状态,一开始状态为NEW,所有状态为。
可能的任务状态转换路径为
ScheduledFutureTask内部还有一个变量period用来表示任务的类型,任务类型如下:
ScheduledThreadPoolExecutor的一个构造函数如下,由该构造函数可知线程池队列是DelayedWorkQueue。
该方法的作用是提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay时间后开始执行。
提交的任务不是周期性任务,任务只会执行一次,代码如下。
代码(2)装饰任务,把提交的command任务转换为ScheduledFutureTask。
ScheduledFutureTask是具体放入延迟队列里面的东西。
由于是延迟任务,所以ScheduledFutureTask实现了long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法。
triggerTime方法将延迟时间转换为绝对时间,也就是把当前时间的纳秒数加上延迟的纳秒数后的long型值。
ScheduledFutureTask的构造函数如下。
在构造函数内部首先调用了父类FutureTask的构造函数,父类FutureTask的构造函数代码如下。
compareTo(Delayed other)方法的代码如下:
compareTo的作用是加入元素到延迟队列后,在内部建立或者调整堆时会使用该元素的compareTo方法与队列里面其他元素进行比较,让最快要过期的元素放到队首。
所以无论什么时候向队列里面添加元素,队首的元素都是最快要过期的元素。
代码(3)将任务添加到延迟队列,delayedExecute的代码如下。
代码(4)首先判断当前线程池是否已经关闭了,如果已经关闭则执行线程池的拒绝策略,否则执行代码(5)将任务添加到延迟队列。
添加完毕后还要重新检查线程池是否被关闭了,如果已经关闭则从延迟队列里面删除刚才添加的任务,但是此时有可能线程池中的线程已经从任务队列里面移除了该任务,也就是该任务已经在执行了,所以还需要调用任务的cancle方法取消任务。
如果代码(6)判断结果为false,则会执行代码(7)确保至少有一个线程在处理任务,即使核心线程数corePoolSize被设置为0。
ensurePrestart的代码如下。
上面我们分析了如何向延迟队列添加任务,下面我们来看线程池里面的线程如何获取并执行任务。
在前面讲解ThreadPoolExecutor时我们说过,具体执行任务的线程是Worker线程,Worker线程调用具体任务的run方法来执行。
由于这里的任务是ScheduledFutureTask,所以我们下面看看ScheduledFutureTask的run方法。
代码(8)中的isPeriodic的作用是判断当前任务是一次性任务还是可重复执行的任务,isPeriodic的代码如下。
代码(9)判断当前任务是否应该被取消,canRunInCurrentRunState的代码如下。
这里传递的periodic的值为false,所以isRunningOrShutdown的参数为executeExistingDelayedTasksAfterShutdown。
executeExistingDelayedTasksAfterShutdown默认为true,表示当其他线程调用了shutdown命令关闭了线程池后,当前任务还是要执行,否则如果为false,则当前任务要被取消。
由于periodic的值为false,所以执行代码(10)调用父类FutureTask的run方法具体执行任务。FutureTask的run方法的代码如下。
如果任务执行成功则执行代码(13.2)修改任务状态,set方法的代码如下。
如上代码首先使用CAS将当前任务的状态从NEW转换到COMPLETING。
这里当有多个线程调用时只有一个线程会成功。
成功的线程再通过UNSAFE.putOrderedInt设置任务的状态为正常结束状态,这里没有使用CAS是因为对于同一个任务只可能有一个线程运行到这里。
在这里使用putOrderedInt比使用CAS或者putLongvolatile效率要高,并且这里的场景不要求其他线程马上对设置的状态值可见。
请思考个问题,在什么时候多个线程会同时执行CAS将当前任务的状态从NEW转换到COMPLETING?其实当同一个command被多次提交到线程池时就会存在这样的情况,因为同一个任务共享一个状态值state。
如果任务执行失败,则执行代码(13.1)。setException的代码如下,可见与set函数类似。
该方法的作用是,当任务执行完毕后,让其延迟固定时间后再次运行(fixed-delay任务)。
其中initialDelay表示提交任务后延迟多少时间开始执行任务command,delay表示当任务执行完毕后延长多少时间后再次运行command任务,unit是initialDelay和delay的时间单位。
任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。
scheduleWithFixedDelay的代码如下。
将任务添加到延迟队列后线程池线程会从队列里面获取任务,然后调用ScheduledFutureTask的run方法执行。
由于这里period<0,所以isPeriodic返回true,所以执行代码(11)。runAndReset的代码如下。
该代码和FutureTask的run方法类似,只是任务正常执行完毕后不会设置任务的状态,这样做是为了让任务成为可重复执行的任务。
这里多了代码(19),这段代码判断如果当前任务正常执行完毕并且任务状态为NEW则返回true,否则返回false。
如果返回了true则执行代码(11.1)的setNextRunTime方法设置该任务下一次的执行时间。setNextRunTime的代码如下。
该方法相对起始时间点以固定频率调用指定的任务(fixed-rate任务)。
当把任务提交到线程池并延迟initialDelay时间(时间单位为unit)后开始执行任务command。
然后从initialDelay+period时间点再次执行,而后在initialDelay+2*period时间点再次执行,循环往复,直到抛出异常或者调用了任务的cancel方法取消了任务,或者关闭了线程池。
scheduleAtFixedRate的原理与scheduleWithFixedDelay类似,下面我们讲下它们之间的不同点。
首先调用scheduleAtFixedRate的代码如下。