简单来说,就是用一个池维护若干个线程,当需要执行任务时,直接调用其中某一个线程来执行。
当我们需要一个新的线程执行任务时,可能会直接创建一个
new Thread(()->{
// do something
}).start();
在业务量较少的情况,这样也没什么太大问题。但是如果任务频繁的话。频繁的创建和销毁线程是十分消耗性能的,甚至可能创建和销毁线程所用时间大于任务本身执行所用时间。如果业务量非常大,可能会占用过多的资源,导致整个服务由于资源不足而宕机。
1.提升性能:线程池能独立负责线程的创建、维护和分配,重复利用已创建的线程,节省了频繁创建和销毁带来的性能损耗,
任务来了分配一个线程就可以立即干活,而不用等待线程重新创建。
2.线程管理:每个Java线程池会保持一些基本的线程统计信息,对线程进行有效管理,线程池控制线程数,
避免过多消耗服务器资源,亦更方便调优和监控。
线程池执行流程图
通过Executor,开发人员可以将任务提交给线程池来执行,而不需要直接管理线程的创建和销毁.Executor提供了execute()接口来执行已提交的Runnable执行目标实例,它只有1个方法:
void execute(Runnable command)
ExecutorService?是?Executor?接口的子接口,它扩展了?Executor?的功能,提供了更多的方法来控制线程的生命周期,方便地管理线程的创建、调度和销毁,避免了手动创建和关闭线程的繁琐操作。
主要功能包括:
对外提供异步任务的接收服务,源码如下
//向线程池提交单个异步任务
<T>?Future<T>?submit(Callable<T>?task);
//线程池提交批量异步任务
?<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
???????????????????????long timeout, TimeUnit unit)throws InterruptedException;
注:异步任务是指一个任务在开始执行后,不需要等待其完成就可以继续执行其他任务。异步任务的执行方式可以显著提高程序的执行效率,特别是在处理耗时较长的任务时。我们用多线程的就在执行异步任务。
AbstractExecutorService?是 Java 中的一个抽象类,它是?ExecutorService?接口的一个实现。AbstractExecutorService?提供了一些默认实现,以简化?ExecutorService?的实现要实现很多方法的问题。
ThreadPoolExecutor?是 Java 中的一个类,它是?AbstractExecutorService?的一个实现,提供了线程池的功能。ThreadPoolExecutor?允许你配置线程池的各种参数,通过构造函数,可以设置线程池的核心线程数、最大线程数、线程存活时间等参数。这些参数可以根据实际需求进行调整,以达到最佳的性能表现。
ScheduledExecutorService是专门用于调度任务的接口,它扩展了ExecutorService的功能,增加了定时任务相关的功能。通过ScheduledExecutorService,可以安排任务在给定的延迟后执行,或者定期执行,适用于需要定时或周期性执行的任务。
而AbstractExecutorService是一个抽象类,它实现了ExecutorService接口的部分方法,为子类提供了一些默认实现。通过继承AbstractExecutorService,开发人员可以更容易地实现自己的ExecutorService接口,而不需要从头开始实现所有的方法。
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,它实现ScheduledExecutorService接口,因此提供了线程池和调度器的组合。
Executors是Java中用于创建线程池的工厂类,位于java.util.concurrent包下。Executors提供了几个静态方法来创建不同类型的线程池,如单线程线程池、固定线程数的线程池、缓存线程池等。
Executors创建线程的4种方法
①newSingleThreadExecutor()
②newFixedThreadPool(int nThreads)
③newCachedThreadPool()
④newScheduledThreadPool(int corePoolSize)
创建一个线程池实例的核心构造方法
public ThreadPoolExecutor(int corePoolSize,
??????????????????????????????int maximumPoolSize,
??????????????????????????????long keepAliveTime,
??????????????????????????????TimeUnit unit,
??????????????????????????????BlockingQueue<Runnable> workQueue,
??????????????????????????????ThreadFactory threadFactory,
??????????????????????????????RejectedExecutionHandler handler)
最初线程池里没有线程,一开始新建的就是核心线程
最大线程数
=核心线程数+非核心线程数
当线程数到了核心线程数且队列满了才会新建非核心线程
非核心线程存活时间
非核心线程一段时间不干活就会被销毁
通常,核心线程闲置也会保留在线程池里。但如果设置ThreadPoolExecutor的allowCoreThreadTimeOut属性为true,核心线程闲置一段时间也会被销毁。
非核心线程数存活时间单位
这个队列接收到任务的时候,会直接提交给线程处理,而不保留它。
如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务
使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE。(保证不出现【线程数达到了maximumPoolSize而不能新建线程】的错误)
这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务
如果当前线程数等于核心线程数,则进入队列等待
这个队列没有最大值限制,所有超过核心线程数的任务都将被添加到队列中。(这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize)
这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)执行任务
如果当前线程数等于核心线程数,则进入队列等待
如果队列已满,则新建线程(非核心线程)执行任务
如果总线程数到了maximumPoolSize,则触发拒绝策略
可以限定这个队列的长度
这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务队列内元素必须实现Delayed接口,即任务必须实现Delayed接口
①这是一个接口,需要实现它的Thread newThread(Runnable r)方法,可以对线程进行自定义的初始化,例如给线程设定名字,方便后期调试
public interface ThreadFactory {
????Thread newThread(Runnable r);
}
②只有1个方法,调用ThreadFactory的唯一方法newThread()创建新线程时,可以更改所创建的新线程的名称、线程组、优先级、守护进程状态等
③使用Executors创建新的线程池时,可以指定工厂,未指定是默认使用线程池时,也可以基于ThreadFactory(线程工厂)创建
?public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory);
?public static ExecutorService newFixedThreadPool(int nThreads)
注意:线程工厂和线程池工厂有区别,Executors为线程池工厂类,用于快捷创建线程池(Thread Pool);ThreadFactory为线程工厂类,用于创建线程(Thread)
注:当提交任务数超过maximumPoolSize+workQueue之和时触发
例子xxx
注:虽然?newFixedThreadPool?适合 CPU 密集型任务,但它并不一定适合 IO 密集型任务。对于 IO 密集型任务,如网络请求或数据库操作,线程可能会在等待 IO 操作完成时阻塞,导致线程资源的浪费。在这种情况下,可以考虑使用其他类型的线程池,如?newCachedThreadPool?或?newSingleThreadExecutor
void execute(Runnable command): Executor接口中的方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
这3个submit方法都是ExecutorService接口中的方法
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
?线程状态转换图
等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务
立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务
等待线程池完成关闭, shutdown()与shutdownNow()方法之后,用户程序都不会主动等待线程池关闭完成,在设置的时间timeout内如果线程池完成关闭,返回true, 否则返回false
那么,我们实际中用哪一种捏。
答案是,都不用。
因为,上述线程中
所以,我们来自定义线程池。
线程池使用步骤大概如下
//自定义线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(params...);
//任务提交
pool.execute(Runnable r);
//关闭
pool.shutdown();
①自定义工具类。
包装自定义线程池,对外提供静态方法方便使用。
这里为了方便测试,核心线程1,最大线程10,缓存队列10,该线程池最大接收20个任务
package com.test.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomThreadPoolUtil {
private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolUtil.class);
private static ThreadPoolExecutor pool = null;
static {
pool = new ThreadPoolExecutor(1, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
}
public static void destory() {
pool.shutdown();
}
public static void execute(Runnable r) {
pool.execute(r);
}
private static class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolUtil.class.getSimpleName() + count.incrementAndGet();
t.setName(threadName);
return t;
}
}
private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.error("任务执行失败 {}, 线程池已满 {}", r.toString(), executor.toString());
}
}
}
2 测试类。
创建21个任务,故意大于自定义线程池最大可处理量20
package com.test.threadpool;
public class TestThreadPool {
public static void main(String[] args) {
int num = 21;
for(int i=1; i<=num; i++) {
int j = i;
CustomThreadPoolUtil.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100); // 模拟业务运行
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 执行了任务" + j);
}
});
}
CustomThreadPoolUtil.destory();
}
}
3 测试结果。
20个任务被正常执行。最后一个任务被拒绝,调用了我们的自定义拒绝方法
任务执行失败 com.threadpool.TestThreadPool$1@3af49f1c,
线程池已满 java.util.concurrent.ThreadPoolExecutor@19469ea2
[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]
CustomThreadPoolUtil2 执行了任务12
CustomThreadPoolUtil1 执行了任务1
CustomThreadPoolUtil7 执行了任务17
CustomThreadPoolUtil3 执行了任务13
CustomThreadPoolUtil4 执行了任务14
CustomThreadPoolUtil5 执行了任务15
CustomThreadPoolUtil9 执行了任务19
CustomThreadPoolUtil8 执行了任务18
CustomThreadPoolUtil6 执行了任务16
CustomThreadPoolUtil10 执行了任务20
CustomThreadPoolUtil1 执行了任务3
CustomThreadPoolUtil2 执行了任务2
CustomThreadPoolUtil9 执行了任务8
CustomThreadPoolUtil3 执行了任务5
CustomThreadPoolUtil7 执行了任务4
CustomThreadPoolUtil8 执行了任务9
CustomThreadPoolUtil5 执行了任务7
CustomThreadPoolUtil10 执行了任务11
CustomThreadPoolUtil6 执行了任务10
CustomThreadPoolUtil4 执行了任务6
刚才参数只是为了方便测试,实际中,如何设置各个参数才更合理呢
参考 任务耗时 和 每秒任务数
假设一个任务耗时0.1秒,系统每秒产生100个任务。
如果想在1秒内处理完这100个任务,那么有 0.1 * 100 / corePoolSize = 1,得 corePoolSize = 10
同理,如果只是偶尔某一秒产生了100个任务,后面有更多时间去处理,如2秒,那么0.1 * 100 / corePoolSize = 2,得 corePoolSize = 5
tip: 根据8020法则,实际应用中,不会每秒一直产生100的任务量,所以最终核心线程数可以设置为计算所得的80%,即最终corePoolSize = 10 * 0.8 = 8。而有时100的任务量,还有缓存队列和最大线程数来保证可以执行。不过为了方便后续计算,这里还是先取 corePoolSize = 10。
参考 核心线程数 和 任务耗时
一般可设置为 核心线程数/单个任务执行时间*2
如本例中,缓存队列长度可设置为 10 / 0.1 * 2 = 200
参考 核心线程数,缓存队列长度,每秒最大任务数
一般可设置为 (最大任务数-任务队列长度)*单个任务执行时间
假设本例中,每秒最大任务数1000,则最大线程数 = (1000 - 200) * 0.1 = 80
参考系统运行环境和硬件压力设定
无固定参考值,可根据系统产生任务的时间间隔合理设置
参考 任务重要程度
任务不重要可直接丢弃,重要可自行采用缓冲机制