首先,我们为什么需要线程池?
让我们先来了解下什么是?对象池?技术。某些对象(比如线程,数据库连接等),它们创建的代价是非常大的 —— 相比于一般对象,它们创建消耗的时间和内存都很大(而且这些对象销毁的代价比一般对象也大)。所以,如果我们维护一个?池,每次使用完这些对象之后,并不销毁它,而是将其放入池中,下次需要使用时就直接从池中取出,便可以避免这些对象的重复创建;同时,我们可以固定?池的大小,比如设置池的大小为 N —— 即池中只保留 N 个这类对象 —— 当池中的 N 个对象都在使用中的时候,为超出数量的请求设置一种策略,比如?排队等候?或者?直接拒绝请求?等,从而避免频繁的创建此类对象。
线程池?即对象池的一种(池中的对象为线程?Thread),类似的还有?数据库连接池(池中对象为数据库连接?Connection)。合理利用线程池能够带来三个好处(参考本节末的?References[1]):
本文只介绍 Java 中线程池的基本使用,不会过多的涉及到线程池的原理。如果有兴趣的读者需要深入理解线程池的实现原理,可以参考文末的?References。
JDK 中线程池的基础架构如下:
执行器?Executor?是顶级接口,只包含了一个?execute?方法,用来执行一个?Runnable?任务:
执行器服务?ExecutorService?接口继承了?Executor?接口,ExecutorService?是所有线程池的基础接口,它定义了 JDK 中线程池应该实现的基本方法:
线程池执行器?ThreadPoolExecutor?是基础线程池的核心实现,并且可以通过定制?ThreadPoolExecutor?的构造参数或者继承?ThreadPoolExecutor,实现自己的线程池;
ScheduledThreadPoolExecutor?继承自?ThreadPoolExecutor,是能执行周期性任务或定时任务的线程池;
ForkJoinPool?是 JDK1.7 时添加的类,作为对?Fork/Join?型线程池的实现。
本文只介绍?ThreadPoolExecutor?线程池的使用,ScheduledThreadPoolExecutor?和?ForkJoinPool?会在之后的文章中介绍。
查看?ThreadPoolExecutor?的源码可知,在?ThreadPoolExecutor?的内部,将每个池中的线程包装为了一个?Worker:
然后在?ThreadPoolExecutor?中定义了一个?HashSet<Worker>,作为?“池”:
设置一个合适的线程池(即自定义?ThreadPoolExecutor)是比较麻烦的,因此 JDK 通过?Executors?这个工厂类为我们提供了一些预先定义好的线程池:
1、固定大小的线程池
创建一个包含?nThreads?个工作线程的线程池,这?nThreads?个线程共享一个无界队列(即不限制大小的队列);当新任务提交到线程池时,如果当前没有空闲线程,那么任务将放入队列中进行等待,直到有空闲的线程来从队列中取出该任务并运行。
(通过?Runtime.getRuntime().availableProcessors()?可以获得当前机器可用的处理器个数,对于计算密集型的任务,固定大小的线程池的?nThreads?设置为这个值时,一般能获得最大的 CPU 使用率)
2、单线程线程池
创建一个只包含一个工作线程的线程池,它的功能可以简单的理解为 即?newFixedThreadPool?方法传入参数为?1?的情况。但是与?newFixedThreadPool(1)?不同的是,如果线程池中这个唯一的线程意外终止,线程池会创建一个新线程继续执行之后的任务。
3、可缓存线程的线程池
创建一个可缓存线程的线程池。当新任务提交到线程池时,如果当前线程池中有空闲线程可用,则使用空闲线程来运行任务,否则新建一个线程来运行该任务,并将该线程添加到线程池中;而且该线程池会终止并移除那些超过 60 秒未被使用的空闲线程。所以这个线程池表现得就像缓存,缓存的资源为线程,缓存的超时时间为 60 秒。根据 JDK 的文档,当任务的运行时间都较短的时候,该线程池有利于提高性能。
我们看到每个构造线程池的工厂方法都有一个带?ThreadFactory?的重载形式。ThreadFactory?即线程池用来新建线程的工厂,每次线程池需要新建一个线程时,调用的就是这个?ThreadFactory?的?newThread?方法:
(如果不提供自定义的?ThreadFactory,那么使用的就是?DefaultThreadFactory?——?Executors?内定义的内部类)
比如我们要为线程池中的每个线程提供一个特定的名字,那么我们就可以自定义?ThreadFactory?并重写其?newThread?方法:
public class SimpleThreadFactory implements ThreadFactory {
??? private AtomicInteger id = new AtomicInteger(1);
??? @Override
??? public Thread newThread(Runnable r) {
??????? Thread thread = new Thread(r);
??????? thread.setName("Test_Thread-" + id.getAndIncrement());
??????? return thread;
??? }
}
通过 JDK 的源码我们可以知道,以上三种线程池的实现都是基于?ThreadPoolExecutor:
下面我们来看一下线程池的基础接口?ExecutorService?中每个方法的含义。
首先是从?Executor?接口继承到的?execute?方法:
使用该方法即将一个?Runnable?任务交给线程池去执行。
submit?方法:
submit?方法会提交一个任务去给线程池执行,该任务可以是带返回结果的?Callable<V>?任务,也可以是一开始就指定结果的?Runnable?任务,或者不带结果的?Runnable?任务(此时即一开始指定结果为?null)。submit?方法会返回一个与所提交任务相关联的?Future<V>。Future<V>?的?get?方法可以等待任务执行完毕并返回结果。所以通过?Future<V>,我们可以与已经提交到线程池的任务进行交互。submit?提交任务及任务运行过程大致如下:
invokeAll?方法:
invokeAll?方法可以一次执行多个任务,但它并不同等于多次调用?submit?方法。submit?方法是非阻塞的,每次调用?submit?方法提交任务到线程池之后,会立即返回与任务相关联的?Future<V>,然后当前线程继续向后执行;
而?invokeAll?方法是阻塞的,只有当提交的多个任务都执行完毕之后,invokeAll?方法才会返回,执行结果会以List<Future<V>>?返回,该?List<Future<V>>?中的每个?Future<V>?是和提交任务时的?Collection<Callable<V>>?中的任务?Callable<V>?一 一对应的。带?timeout?参数的?invokeAll?就是设置一个超时时间,如果超过这个时间?invokeAll?中提交的所有任务还有没全部执行完,那么没有执行完的任务会被取消(中断),之后同样以一个?List<Future<V>>?返回执行的结果。
invokeAny?方法:
invokeAny?方法也是阻塞的,与?invokeAll?方法的不同之处在于,当所提交的一组任务中的任何一个任务完成之后,invokeAny?方法便会返回(返回的结果便是那个已经完成的任务的返回值),而其他任务会被取消(中断)。
举一个?invokeAny?使用的例子:电脑有 C、D、E、F 四个盘,我们需要找一个文件,但是我们不知道这个文件位于哪个盘中,我们便可以使用?invokeAny,并提交四个任务(对应于四个线程)分别查找 C、D、E、F 四个盘,如果哪个线程找到了这个文件,那么此时?invokeAny?便停止阻塞并返回结果,同时取消其他任务。
shutdown?方法:
shutdown?方法的作用是向线程池发送关闭的指令。一旦在线程池上调用?shutdown?方法之后,线程池便不能再接受新的任务;如果此时还向线程池提交任务,那么将会抛出?RejectedExecutionException?异常。之后线程池不会立刻关闭,直到之前已经提交到线程池中的所有任务(包括正在运行的任务和在队列中等待的任务)都已经处理完成,才会关闭。
shutdownNow?方法:
与?shutdown?不同,shutdownNow?会立即关闭线程池 —— 当前在线程池中运行的任务会全部被取消,然后返回线程池中所有正在等待的任务。
(值得注意的是,我们?必须显式的关闭线程池,否则线程池不会自己关闭)
awaitTermination?方法:
awaitTermination?可以用来判断线程池是否已经关闭。调用?awaitTermination?之后,在?timeout?时间内,如果线程池没有关闭,则阻塞当前线程,否则返回?true;当超过?timeout?的时间后,若线程池已经关闭则返回?true,否则返回?false。该方法一般这样使用:
isShutdown()?方法,如果线程池已经调用?shutdown?或者?shutdownNow,则返回?true,否则返回?false;
isTerminated()?方法,如果线程池已经调用?shutdown?并且线程池中所有的任务已经执行完毕,或者线程池调用了?shutdownNow,则返回?true,否则返回?false。
通过以上介绍,我们已经了解了?ExecutorService?中所有方法的功能,现在让我们来实践?ExecutorService?的功能。
两个例子中的任务,首先是任务类型为?Runnable?的情况:
我们来实践下?ScheduledThreadPoolExecutor?的?scheduleAtFixedRate?方法:
import java.util.*;
import java.util.concurrent.*;
public class RunnableTest {
public static void main(String[] args) throws Exception {
System.out.println("使用线程池运行 Runnable 任务:");
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建大小固定为 5 的线程池
List<AccumRunnable> tasks = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
AccumRunnable task = new AccumRunnable(i * 10 + 1, (i + 1) * 10);
tasks.add(task);
threadPool.execute(task); // 让线程池执行任务 task
}
threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕之后,线程池会关闭
threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待线程池关闭,等待的最大时间为 1 小时
int total = 0;
for (AccumRunnable task : tasks) {
total += task.getResult(); // 调用在 AccumRunnable 定义的 getResult 方法获得返回的结果
}
System.out.println("Total: " + total);
}
static final class AccumRunnable implements Runnable {
private final int begin;
private final int end;
private int result;
public AccumRunnable(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public void run() {
result = 0;
try {
for (int i = begin; i <= end; i++) {
result += i;
Thread.sleep(100);
}
} catch (InterruptedException ex) {
ex.printStackTrace(System.err);
}
System.out.printf("(%s) - 运行结束,结果为 %d\n",
Thread.currentThread().getName(), result);
}
public int getResult() {
return result;
}
}
}
运行结果:
可以看到?NetBeans?给出的运行时间为 2 秒 —— 因为每个任务需要 1 秒的时间,而线程池中的线程个数固定为 5 个,所以线程池最多同时处理 5 个任务,因而 10 个任务总共需要 2 秒的运行时间。
任务类型为?Callable:
import java.util.*;
import java.util.concurrent.*;
public class CallableTest {
public static void main(String[] args) throws Exception {
System.out.println("使用线程池运行 Callable 任务:");
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建大小固定为 5 的线程池
List<Future<Integer>> futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
AccumCallable task = new AccumCallable(i * 10 + 1, (i + 1) * 10);
Future<Integer> future = threadPool.submit(task); // 提交任务
futures.add(future);
}
threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕之后,线程池会关闭
int total = 0;
for (Future<Integer> future : futures) {
total += future.get(); // 阻塞,直到任务结束,返回任务的结果
}
System.out.println("Total: " + total);
}
static final class AccumCallable implements Callable<Integer> {
private final int begin;
private final int end;
public AccumCallable(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = begin; i <= end; i++) {
result += i;
Thread.sleep(100);
}
System.out.printf("(%s) - 运行结束,结果为 %d\n",
Thread.currentThread().getName(), result);
return result;
}
}
}
?
运行结果:
线程池是很强大而且很方便的工具,它提供了对线程进行统一的分配和调控的各种功能。自 JDK1.5 时 JDK 添加了线程池的功能之后,一般情况下更推荐使用线程池来编写多线程程序,而不是直接使用?Thread。
在 使用线程池 中已经介绍,JDK 1.5 时,标准类库添加了对线程池的支持,然后在线程池核心实现 ThreadPoolExecutor 的基础上,实现了 ScheduledThreadPoolExecutor,作为可以 定时和周期性执行任务 的线程池。ScheduledThreadPoolExecutor 的类图如下:
ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService 接口,ScheduledExecutorService 继承了 ExecutorService 接口,所以首先 ScheduledThreadPoolExecutor 是一个 ExecutorService (线程池),然后除了具有线程池的功能,它还有定时和周期性执行任务的功能。ScheduledExecutorService 除了从 ExecutorService 继承的方法外,还包括如下四个方法:
第一个?Schedule?方法:
delay?指定的时间后,执行指定的?Runnable?任务,可以通过返回的?ScheduledFuture<?>?与该任务进行交互。
第二个?Schedule?方法:
delay?指定的时间后,执行指定的?Callable<V>?任务,可以通过返回的?ScheduledFuture<V>?与该任务进行交互。
(ScheduledFuture?接口 继承自?Future?接口,所以?ScheduledFuture?和任务的交互方式与?Future?一致。所以通过ScheduledFuture,可以 判断定时任务是否已经完成,获得定时任务的返回值,或者取消任务等)
scheduleAtFixedRate?方法:
initialDelay?指定的时间后,开始按周期?period?执行指定的?Runnable?任务。
假设调用该方法后的时间点为?0,那么第一次执行任务的时间点为?initialDelay,第二次为?initialDelay + period,第三次为?initialDelay + period + period,以此类推。
scheduleWithFixedDelay?方法:
initialDelay?指定的时间后,开始按指定的?delay?延期性的执行指定的?Runnable?任务。
假设调用该方法后的时间点为?0,每次任务需要耗时?T(i)(i?为第几次执行任务),那么第一次执行任务的时间点为?initialDelay,第一次完成任务的时间点为?initialDelay + T(1),则第二次执行任务的时间点为?initialDelay + T(1) + delay;第二次完成任务的时间点为?initialDelay + (T(1) + delay) + T(2),所以第三次执行任务的时间点为?initialDelay + T(1) + delay + T(2) + delay,以此类推。
我们来实践下?ScheduledThreadPoolExecutor
?的?scheduleAtFixedRate
?方法:?
public class ScheduledExecutorServiceTest {
public static void main(String[] args) throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
TimerTask timerTask = new TimerTask(2000); // 任务需要 2000 ms 才能执行完毕
System.out.printf("起始时间:%s\n\n", new SimpleDateFormat("HH:mm:ss").format(new Date()));
// 延时 1 秒后,按 3 秒的周期执行任务
timer.scheduleAtFixedRate(timerTask, 1000, 3000, TimeUnit.MILLISECONDS);
}
private static class TimerTask implements Runnable {
private final int sleepTime;
private final SimpleDateFormat dateFormat;
public TimerTask(int sleepTime) {
this.sleepTime = sleepTime;
dateFormat = new SimpleDateFormat("HH:mm:ss");
}
@Override
public void run() {
System.out.println("任务开始,当前时间:" + dateFormat.format(new Date()));
try {
System.out.println("模拟任务运行...");
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
ex.printStackTrace(System.err);
}
System.out.println("任务结束,当前时间:" + dateFormat.format(new Date()));
System.out.println();
}
}
}
?运行结果:
可以看到运行结果完全符合预期 —— 延时 1 秒后,每隔 3 秒执行一次任务。
上面是任务的运行时间小于周期时间的情况 —— 那如果任务运行的时间大于给定的执行周期呢?(比如任务运行需要 3 s,但是我们指定的周期为 2 s)
修改?main
?方法:
public static void main(String[] args) throws Exception {
ScheduledExecutorService timer = Executors.newScheduledThreadPool(2);
TimerTask timerTask = new TimerTask(3000); // 每个任务需要 3000 ms 才能执行完毕
System.out.printf("起始时间:%s\n\n", new SimpleDateFormat("HH:mm:ss").format(new Date()));
timer.scheduleAtFixedRate(timerTask, 1000, 2000, TimeUnit.MILLISECONDS);
}
运行结果:
????????可以看到此时虽然我们指定的周期为 2 s,但是因为任务的运行就需要 3 s(超过周期),所以这种情况下?scheduleAtFixedRate
?的处理方式为?上一次任务刚完成,则紧接着立即运行下一次任务,而不是使用线程池中的空闲线程来运行任务以维护 2 秒这个周期 —— 由此可见,每个定时任务在?ScheduledThreadPoolExecutor
?中,都是串行运行的,即下一次运行任务一定在上一次任务结束之后。
当一个任务正在运行的过程中,而我们却发现这个任务已经没有必要继续运行了,那么我们便产生了取消任务的需要。比如?线程池的?invokeAny?方法,它可以在线程池中运行一组任务,当其中任何一个任务完成时,invokeAny?方法便会停止阻塞并返回,同时也会?取消其他任务。那我们如何取消一个正在运行的任务?
前面两篇多线程的文章都有提到?Future<V>?接口和它的一个实现类?FutureTask<V>,并且我们已经知道?Future<V>?可以用来和已经提交的任务进行交互。Future<V>?接口定义了如下几个方法:
get?方法:通过前面文章的介绍,我们已经了解了?get?方法的使用 ——?get?方法?用来返回和?Future?关联的任务的结果。带参数的?get?方法指定一个超时时间,在超时时间内该方法会阻塞当前线程,直到获得结果 。
不带参数的?get?可以理解为超时时间无限大,即一直等待直到获得结果或者出现异常。
cancel(boolean mayInterruptIfRunning)?方法:该方法是非阻塞的。通过 JDK 的文档,我们可以知道?该方法便可以用来(尝试)终止一个任务。
cancel(false)?与?cancel(true)的区别在于,cancel(false)?只?取消已经提交但还没有被运行的任务(即任务就不会被安排运行);而?cancel(true)?会取消所有已经提交的任务,包括?正在等待的?和?正在运行的?任务。
isCancelled?方法:该方法是非阻塞的。在任务结束之前,如果任务被取消了,该方法返回?true,否则返回?false;如果任务已经完成,该方法则一直返回?false。
isDone?方法:该方法同样是非阻塞的。如果任务已经结束(正常结束,或者被取消,或者执行出错),返回?true,否则返回?false。
然后我们来实践下?Future?的?cancel?方法的功能:
import java.util.concurrent.*;
public class FutureTest {
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
SimpleTask task = new SimpleTask(3_000); // task 需要运行 3 秒
Future<Double> future = threadPool.submit(task);
threadPool.shutdown(); // 发送关闭线程池的指令
double time = future.get();
System.out.format("任务运行时间: %.3f s\n", time);
}
private static final class SimpleTask implements Callable<Double> {
private final int sleepTime; // ms
public SimpleTask(int sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public Double call() throws Exception {
double begin = System.nanoTime();
Thread.sleep(sleepTime);
double end = System.nanoTime();
double time = (end - begin) / 1E9;
return time; // 返回任务运行的时间,以 秒 计
}
}
}
运行结果(任务正常运行):
然后我们定义一个用来取消任务的方法:
private static void cancelTask(final Future<?> future, final int delay) {
Runnable cancellation = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(delay);
future.cancel(true); // 取消与 future 关联的正在运行的任务
} catch (InterruptedException ex) {
ex.printStackTrace(System.err);
}
}
};
new Thread(cancellation).start();
}
然后修改 main 方法:
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
SimpleTask task = new SimpleTask(3_000); // task 需要运行 3 秒
Future<Double> future = threadPool.submit(task);
threadPool.shutdown(); // 发送关闭线程池的指令
cancelTask(future, 2_000); // 在 2 秒之后取消该任务
try {
double time = future.get();
System.out.format("任务运行时间: %.3f s\n", time);
} catch (CancellationException ex) {
System.err.println("任务被取消");
} catch (InterruptedException ex) {
System.err.println("当前线程被中断");
} catch (ExecutionException ex) {
System.err.println("任务执行出错");
}
}
运行结果:
可以看到,当任务被取消时,Future?的?get?方法抛出了?CancellationException?异常,并且成功的取消了任务(从构建(运行)总时间可以发现)。
这样就可以了吗?调用?Future?的?cancel(true)?就一定能取消正在运行的任务吗?
我们来写一个真正的耗时任务,判断一个数是否为素数,测试数据为 1000000033 (它是一个素数)。
import java.util.concurrent.*;
public class FutureTest {
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
long num = 1000000033L;
PrimerTask task = new PrimerTask(num);
Future<Boolean> future = threadPool.submit(task);
threadPool.shutdown();
boolean result = future.get();
System.out.format("%d 是否为素数? %b\n", num, result);
}
private static final class PrimerTask implements Callable<Boolean> {
private final long num;
public PrimerTask(long num) {
this.num = num;
}
@Override
public Boolean call() throws Exception {
// i < num 让任务有足够的运行时间
for (long i = 2; i < num; i++) {
if (num % i == 0) {
return false;
}
}
return true;
}
}
}
在我的机器上,这个任务需要 13 秒才能运行完毕:
然后我们修改?main
?方法,在任务运行到 2 秒的时候调用?Future
?的?cancel(true)
?:
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
long num = 1000000033L;
PrimerTask task = new PrimerTask(num);
Future<Boolean> future = threadPool.submit(task);
threadPool.shutdown(); // 发送关闭线程池的指令
cancelTask(future, 2_000); // 在 2 秒之后取消该任务
try {
boolean result = future.get();
System.out.format("%d 是否为素数? %b\n", num, result);
} catch (CancellationException ex) {
System.err.println("任务被取消");
} catch (InterruptedException ex) {
System.err.println("当前线程被中断");
} catch (ExecutionException ex) {
System.err.println("任务执行出错");
}
}
程序运行到 2 秒时候的输出:
程序的最终输出:
可以发现,虽然我们取消了任务,Future
?的?get
?方法也对我们的取消做出了响应(即抛出?CancellationException
?异常),但是任务并没有停止,而是直到任务运行完毕了,程序才结束。
查看?Future
?的实现类?FutureTask
?的源码,我们来看一下调用?cancel(true)
?究竟发生了什么:
原来?cancel(true)
?方法的原理是向正在运行任务的线程发送中断指令 —— 即调用运行任务的?Thread
?的?interrupt()
?方法。
所以?如果一个任务是可取消的,那么它应该可以对?Thread
?的?interrupt()
?方法做出被取消时的响应。
而?Thread
?的?isInterrupted()
?方法,便可以用来判断当前?Thread
?是否被中断。任务开始运行时,运行任务的线程肯定没有被中断,所以?isInterruped()
?方法会返回?false
;而?interrupt()
?方法调用之后,isInterruped()
?方法会返回?true
。
(由此我们也可以知道,Thread.sleep
?方法是可以对中断做出响应的)
所以我们修改?PrimerTask
?的?call
?方法,让其可以对运行任务的线程被中断时做出停止运行(跳出循环)的响应:
@Override
public Boolean call() throws Exception {
// i < num 让任务有足够的运行时间
for (long i = 2; i < num; i++) {
if (Thread.currentThread().isInterrupted()) { // 任务被取消
System.out.println("PrimerTask.call: 你取消我干啥?");
return false;
}
if (num % i == 0) {
return false;
}
}
return true;
}
运行结果:
可以看到程序在 2 秒的时候停止了运行,任务被成功取消。
总结:如果要通过?Future?的?cancel?方法取消正在运行的任务,那么该任务必定是可以?对线程中断做出响应?的任务。通过?Thread.currentThread().isInterrupted()?方法,我们可以判断任务是否被取消,从而做出相应的取消任务的响应。