Java线程池体系(超详细)【从0到自定义线程池】

发布时间:2023年12月25日

一. 线程池概念

简单来说,就是用一个池维护若干个线程,当需要执行任务时,直接调用其中某一个线程来执行。

二.无线程池的弊端

当我们需要一个新的线程执行任务时,可能会直接创建一个

new Thread(()->{
	// do something
}).start();

在业务量较少的情况,这样也没什么太大问题。但是如果任务频繁的话。频繁的创建和销毁线程是十分消耗性能的,甚至可能创建和销毁线程所用时间大于任务本身执行所用时间。如果业务量非常大,可能会占用过多的资源,导致整个服务由于资源不足而宕机。

三.线程池的作用

1.提升性能:线程池能独立负责线程的创建、维护和分配,重复利用已创建的线程,节省了频繁创建和销毁带来的性能损耗,

任务来了分配一个线程就可以立即干活,而不用等待线程重新创建。

2.线程管理:每个Java线程池会保持一些基本的线程统计信息,对线程进行有效管理,线程池控制线程数,

避免过多消耗服务器资源,亦更方便调优和监控。

四.线程池执行流程

线程池执行流程图

五.线程池架构

1.Executor

通过Executor,开发人员可以将任务提交给线程池来执行,而不需要直接管理线程的创建和销毁.Executor提供了execute()接口来执行已提交的Runnable执行目标实例,它只有1个方法:

void execute(Runnable command)

2.ExecutorService

ExecutorService?是?Executor?接口的子接口,它扩展了?Executor?的功能,提供了更多的方法来控制线程的生命周期,方便地管理线程的创建、调度和销毁,避免了手动创建和关闭线程的繁琐操作。

主要功能包括:

  • 提交任务:通过?submit()?或?execute()?方法,可以将任务提交给?ExecutorService?执行。submit()?方法返回一个?Future?对象,可以用来查询任务的状态或获取任务的结果;execute()?方法则直接启动任务的执行。
  • 控制线程的生命周期:ExecutorService?提供了?shutdown()?和?shutdownNow()?方法来关闭执行器。shutdown()?方法会等待所有正在执行的任务完成,而?shutdownNow()?方法则会尝试停止所有正在执行的任务并返回尚未开始执行的任务列表。
  • 获取任务结果:通过?Future?对象,可以获取任务的执行结果或者等待任务完成。

对外提供异步任务的接收服务,源码如下

//向线程池提交单个异步任务

<T>?Future<T>?submit(Callable<T>?task);

//线程池提交批量异步任务

?<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

???????????????????????long timeout, TimeUnit unit)throws InterruptedException;

注:异步任务是指一个任务在开始执行后,不需要等待其完成就可以继续执行其他任务。异步任务的执行方式可以显著提高程序的执行效率,特别是在处理耗时较长的任务时。我们用多线程的就在执行异步任务。

3. AbstractExecutorService

AbstractExecutorService?是 Java 中的一个抽象类,它是?ExecutorService?接口的一个实现。AbstractExecutorService?提供了一些默认实现,以简化?ExecutorService?的实现要实现很多方法的问题

4. ThreadPoolExecutor

ThreadPoolExecutor?是 Java 中的一个类,它是?AbstractExecutorService?的一个实现,提供了线程池的功能。ThreadPoolExecutor?允许你配置线程池的各种参数,通过构造函数,可以设置线程池的核心线程数、最大线程数、线程存活时间等参数。这些参数可以根据实际需求进行调整,以达到最佳的性能表现。

5. ScheduledExecutorService

ScheduledExecutorService是专门用于调度任务的接口,它扩展了ExecutorService的功能,增加了定时任务相关的功能。通过ScheduledExecutorService,可以安排任务在给定的延迟后执行,或者定期执行,适用于需要定时或周期性执行的任务。

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口的部分方法,为子类提供了一些默认实现。通过继承AbstractExecutorService,开发人员可以更容易地实现自己的ExecutorService接口,而不需要从头开始实现所有的方法。

6. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutorThreadPoolExecutor的子类,它实现ScheduledExecutorService接口,因此提供了线程池和调度器的组合。

7. Executors

Executors是Java中用于创建线程池的工厂类,位于java.util.concurrent包下。Executors提供了几个静态方法来创建不同类型的线程池,如单线程线程池、固定线程数的线程池、缓存线程池等

Executors创建线程的4种方法

newSingleThreadExecutor()

newFixedThreadPool(int nThreads)

newCachedThreadPool()

newScheduledThreadPool(int corePoolSize)

.细说线程参数

创建一个线程池实例的核心构造方法

public ThreadPoolExecutor(int corePoolSize,

??????????????????????????????int maximumPoolSize,

??????????????????????????????long keepAliveTime,

??????????????????????????????TimeUnit unit,

??????????????????????????????BlockingQueue<Runnable> workQueue,

??????????????????????????????ThreadFactory threadFactory,

??????????????????????????????RejectedExecutionHandler handler)

1.corePoolSize: 核心线程数

最初线程池里没有线程,一开始新建的就是核心线程

2.maximumPoolSize:最大线程数

最大线程数

=核心线程数+非核心线程数

当线程数到了核心线程数且队列满了才会新建非核心线程

3.keepAliveTime:非核心线程存活时间

非核心线程存活时间

非核心线程一段时间不干活就会被销毁

通常,核心线程闲置也会保留在线程池里。但如果设置ThreadPoolExecutor的allowCoreThreadTimeOut属性为true,核心线程闲置一段时间也会被销毁。

4.TimeUnit unit:非核心线程存活时间单位

非核心线程数存活时间单位

5.workQueue:缓存队列

①SynchronousQueue

这个队列接收到任务的时候,会直接提交给线程处理,而不保留它。

如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务

使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE。(保证不出现【线程数达到了maximumPoolSize而不能新建线程】的错误)

② ?LinkedBlockingQueue

这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务

如果当前线程数等于核心线程数,则进入队列等待

这个队列没有最大值限制,所有超过核心线程数的任务都将被添加到队列中。(这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize)

③ ArrayBlockingQueue

这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)执行任务

如果当前线程数等于核心线程数,则进入队列等待

如果队列已满,则新建线程(非核心线程)执行任务

如果总线程数到了maximumPoolSize,则触发拒绝策略

可以限定这个队列的长度

④?DelayQueue

这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务队列内元素必须实现Delayed接口,即任务必须实现Delayed接口

6.threadFactory:线程工厂

①这是一个接口,需要实现它的Thread newThread(Runnable r)方法,可以对线程进行自定义的初始化,例如给线程设定名字,方便后期调试

public interface ThreadFactory {

????Thread newThread(Runnable r);

}

②只有1个方法,调用ThreadFactory的唯一方法newThread()创建新线程时,可以更改所创建的新线程的名称、线程组、优先级、守护进程状态等

③使用Executors创建新的线程池时,可以指定工厂,未指定是默认使用线程池时,也可以基于ThreadFactory(线程工厂)创建

?public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory);

?public static ExecutorService newFixedThreadPool(int nThreads)

注意:线程工厂和线程池工厂有区别,Executors为线程池工厂类,用于快捷创建线程池(Thread Pool);ThreadFactory为线程工厂类,用于创建线程(Thread)

7.RejectedExecutionHandler :拒绝策略:

  • ①AbortPolicy:默认策略。抛出RejectedExecutionException异常
  • ②DiscardPolicy:丢弃当前任务,不抛出任何异常
  • ③DIscardOldestPolicy:丢弃队列里最早添加的元素,再安排当前任务。如果失败则不断重试
  • ④CallerRunsPolicy:使用调用者自己的线程来执行任务。调用者线程会调用执行器的execute方法来执行该任务
  • ⑤自定义策略:实现RejectedExecutionHandler接口,实现rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法

注:当提交任务数超过maximumPoolSize+workQueue之和时触发

七.Executors创建线程的4种方法

1. newSingleThreadExecutor创建“单线程化线程池”

  • 单线程化的线程池中的任务是按照提交的次序顺序执行的
  • 只有一个线程的线程池池中的唯一线程的存活时间是无限的
  • 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的
  • 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它
  • 适用场景:任务按照提交次序,一个任务一个任务地逐个执行的场景一个任务一个任务FIFO执行的场景
  • 缺点:问题和固定数量线程池一样,阻塞队列无界

例子xxx

2.newFixedThreadPool创建“固定数量的线程池

  • 如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量
  • 线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
  • 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)
  • 适用场景:需要任务长期执行的场CPU密集型任务执行长期的任务,性能好很多
  • 缺点:内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无限增大,使服务器资源迅速耗尽阻塞队列无界,队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽

3.newCachedThreadPool创建“可缓存线程池”

  • 在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务,缓存队列是不存储任务的队列
  • 线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小,无核心线程,最大线程数Integer.MAX_VALUE
  • 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程
  • 适用场景:需要快速处理突发性强、耗时较短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景
  • 缺点:问题存在于其最大线程数量不设限上。由于其maximumPoolSize的值为Integer.MAX_VALUE(非常大),可以认为可以无限创建线程,如果任务提交较多,就会造成大量的线程被启动,很有可能造成OOM异常,甚至导致CPU线程资源耗尽

4.newScheduledThreadPool创建“可调度线程池”

  • 有固定核心线程,最大线程数Integer.MAX_VALUE
  • 定时以及周期性执行任务
  • 缺点:主要问题在于线程数不设上限

注:虽然?newFixedThreadPool?适合 CPU 密集型任务,但它并不一定适合 IO 密集型任务。对于 IO 密集型任务,如网络请求或数据库操作,线程可能会在等待 IO 操作完成时阻塞,导致线程资源的浪费。在这种情况下,可以考虑使用其他类型的线程池,如?newCachedThreadPool?或?newSingleThreadExecutor

八.线程提交任务两种方式

1.execute方法

void execute(Runnable command): Executor接口中的方法

2.submit方法

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

这3个submit方法都是ExecutorService接口中的方法

3.区别

  • execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数
  • Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果
  • submit()提交任务后会有返回值,而execute()没有submit()方便Exception处理

九.线程池的关闭

1.线程池的5种状态:

 // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING: 线程池创建之后的初始状态,这种状态下可以执行任务
  • SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕
  • STOP:该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会中断所有工作线程
  • TIDYING:该状态下所有任务都已终止或者处理完成,将会执行terminated()钩子方法
  • TERMINATED:执行完terminated()钩子方法之后的状态

?线程状态转换图

2.shutdown()方法

等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务

3.shutdownNow()方法

立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务

4.awaitTermination()方法

等待线程池完成关闭, shutdown()与shutdownNow()方法之后,用户程序都不会主动等待线程池关闭完成,在设置的时间timeout内如果线程池完成关闭,返回true, 否则返回false

?十.自定义线程池

1.选择

那么,我们实际中用哪一种捏。

答案是,都不用。

因为,上述线程中

  • FixedThreadPool 和 SingleThreadExecutor 使用了 LinkedBlockingQueue ,这是个无界队列。当任务突发过多时,这个队列可能因为缓存太多任务而消耗非常多的内存资源,最终导致OOM
  • CachedThreadPool 和 ScheduledThreadPool 最大线程数是Integer.MAX_VALUE。即相当于没有对最大线程数做限制,任务突发过多时,可能因为创建大量线程导致资源耗尽,最终同样导致OOM

所以,我们来自定义线程池。

2.步骤

线程池使用步骤大概如下

  • 根据7大参数,建立一个线程池
  • 放入要执行的任务
  • 最后,如有必要,关闭线程池
//自定义线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(params...);

//任务提交
pool.execute(Runnable r);

//关闭
pool.shutdown();

3.自定义线程池举例

①自定义工具类。

包装自定义线程池,对外提供静态方法方便使用。

这里为了方便测试,核心线程1,最大线程10,缓存队列10,该线程池最大接收20个任务

package com.test.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomThreadPoolUtil {
	
	private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolUtil.class);
	
	private static ThreadPoolExecutor pool = null;

	static {
		pool = new ThreadPoolExecutor(1, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
	}
	
	public static void destory() {
		pool.shutdown();
	}
	
	public static void execute(Runnable r) {
		pool.execute(r);
	}
	
	private static class CustomThreadFactory implements ThreadFactory {
		private AtomicInteger count = new AtomicInteger(0);	
		
		@Override
		public Thread newThread(Runnable r) {
			Thread t = new Thread(r);
			String threadName = CustomThreadPoolUtil.class.getSimpleName() + count.incrementAndGet();
			t.setName(threadName);
			
			return t;
		}
	}
	
	private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

		@Override
		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
			logger.error("任务执行失败 {}, 线程池已满 {}", r.toString(), executor.toString());
		}
		
	}
	
}

2 测试类。

创建21个任务,故意大于自定义线程池最大可处理量20

package com.test.threadpool;

public class TestThreadPool {

	public static void main(String[] args) {
		int num = 21;
		for(int i=1; i<=num; i++) {
			int j = i;
			CustomThreadPoolUtil.execute(new Runnable() {

				@Override
				public void run() {
					try {
						Thread.sleep(100); // 模拟业务运行
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " 执行了任务" + j);
				}
			});
		}
		
		CustomThreadPoolUtil.destory();
		
	}
	
}

3 测试结果。

20个任务被正常执行。最后一个任务被拒绝,调用了我们的自定义拒绝方法

任务执行失败 com.threadpool.TestThreadPool$1@3af49f1c, 
线程池已满 java.util.concurrent.ThreadPoolExecutor@19469ea2
[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]
CustomThreadPoolUtil2 执行了任务12
CustomThreadPoolUtil1 执行了任务1
CustomThreadPoolUtil7 执行了任务17
CustomThreadPoolUtil3 执行了任务13
CustomThreadPoolUtil4 执行了任务14
CustomThreadPoolUtil5 执行了任务15
CustomThreadPoolUtil9 执行了任务19
CustomThreadPoolUtil8 执行了任务18
CustomThreadPoolUtil6 执行了任务16
CustomThreadPoolUtil10 执行了任务20
CustomThreadPoolUtil1 执行了任务3
CustomThreadPoolUtil2 执行了任务2
CustomThreadPoolUtil9 执行了任务8
CustomThreadPoolUtil3 执行了任务5
CustomThreadPoolUtil7 执行了任务4
CustomThreadPoolUtil8 执行了任务9
CustomThreadPoolUtil5 执行了任务7
CustomThreadPoolUtil10 执行了任务11
CustomThreadPoolUtil6 执行了任务10
CustomThreadPoolUtil4 执行了任务6

十一.自定义线程池参数设置推荐

刚才参数只是为了方便测试,实际中,如何设置各个参数才更合理呢

1 核心线程数(corePoolSize)

参考 任务耗时 和 每秒任务数

假设一个任务耗时0.1秒,系统每秒产生100个任务。

如果想在1秒内处理完这100个任务,那么有 0.1 * 100 / corePoolSize = 1,得 corePoolSize = 10

同理,如果只是偶尔某一秒产生了100个任务,后面有更多时间去处理,如2秒,那么0.1 * 100 / corePoolSize = 2,得 corePoolSize = 5

tip: 根据8020法则,实际应用中,不会每秒一直产生100的任务量,所以最终核心线程数可以设置为计算所得的80%,即最终corePoolSize = 10 * 0.8 = 8。而有时100的任务量,还有缓存队列和最大线程数来保证可以执行。不过为了方便后续计算,这里还是先取 corePoolSize = 10。

2 任务队列长度(workQueue)

参考 核心线程数 和 任务耗时

一般可设置为 核心线程数/单个任务执行时间*2

如本例中,缓存队列长度可设置为 10 / 0.1 * 2 = 200

3 最大线程数(maximumPoolSize)

参考 核心线程数,缓存队列长度,每秒最大任务数

一般可设置为 (最大任务数-任务队列长度)*单个任务执行时间

假设本例中,每秒最大任务数1000,则最大线程数 = (1000 - 200) * 0.1 = 80

4 最大空闲时间(keepAliveTime)

参考系统运行环境和硬件压力设定

无固定参考值,可根据系统产生任务的时间间隔合理设置

5 拒绝策略(handler)

参考 任务重要程度

任务不重要可直接丢弃,重要可自行采用缓冲机制

文章来源:https://blog.csdn.net/m0_52226803/article/details/135201822
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。