线程池是一种利用池化技术思想来实现的线程管理技术,主要是为了复用线程、便利地管理线程和任务、并将线程
的创建和任务的执行解耦开来。我们可以创建线程池来复用已经创建的线程来降低频繁创建和销毁线程所带来的资
源消耗。在 JAVA 中主要是使用 ThreadPoolExecutor 类来创建线程池,并且 JDK 中也提供了 Executors 工厂类来
创建线程池(不推荐使用)。
降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
提高响应速度:任务到达时,无需等待线程创建即可立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布
导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池
ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池 ThreadPoolExecutor 分别有五种状态分别为:RUNNING、SHUTDOWN、STOP、TIDYING、
TERMINATED。
使用一个 AtomicInteger 类型的 ctl 字段来描述线程池地运行状态和线程数量,通过 ctl 的高三位来表示线程池的
5 种状态,低 29 位表示线程池中现有的线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 线程池线程数地bit数
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 线程池中最大线程容量
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 表示可接受新任务,且可执行队列中的任务;
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 表示不接受新任务,但可执行队列中的任务;
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 表示不接受新任务,且不再执行队列中的任务,且中断正在执行的任务;
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将
* 要执行terminated()钩子方法,只会有一个线程执行这个方法;
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* TERMINATED,中止状态,已经执行完terminated()钩子方法;
*/
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING:线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。
SHUTDOWN:不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由
RUNNING 转变为 SHUTDOWN 状态。
STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow()
方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。
TIDYING: SHUTDOWN 状态下,任务数为0, 其他所有任务已终止,线程池会变为 TIDYING 状态,会执行
terminated() 方法。线程池中的 terminated() 方法是空实现,可以重写该方法进行相应的处理。 线程池在
SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。 线
程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。
TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为
TERMINATED 状态。
线程池状态之间的流转图如下所示:
Executors 类主要用于提供线程池相关的操作,它提供了一系列工厂方法用于创建线程池,返回的线程池都实
现了ExecutorService接口。
Executors 的几种线程池实现分别如下:
newFixedThreadPool:创建一个固定的长度的线程池,每当提交一个任务就创建一个线程,知道达到线程池
的最大数量,这时线程规模将不再变化,当线程发生未预期的错误而结束时,线程池会创建一个新的线程继续
运行队列里的任务。
newSingleThreadExecutor:这是一个单线程的 Executor ,它创建单个工作线程来执行任务,如果这个线程
异常结束,会创建一个新的来代替它;它的特点是能确保依照任务在队列中的顺序来串行执行。
newCachedThreadPool:创建一个可缓存的线程池,如果线程池的规模超过了处理需求,将自动回收空闲线
程,而当需求正驾驶,则可以自动添加新线程,线程池的规模不存在任何限制。
newScheduledThreadPool:创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于
Timer。
newSingleThreadScheduledExecutor:单线程可执行周期性任务的线程池。
newWorkStealingPool:任务窃取线程池,不保证执行顺序,适合任务耗时差异较大。线程池中有多个线程队
列,有的线程队列中有大量的比较耗时的任务堆积,而有的线程队列却是空的,就存在有的线程处于饥饿状
态,当一个线程处于饥饿状态时,它就会去其它的线程队列中窃取任务。解决饥饿导致的效率问题。默认创建
的并行 level 是 CPU 的核数。主线程结束,即使线程池有任务也会立即停止。
Executors 的其中四种线程池其实都是调用 ThreadPoolExecutor 实现的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Executors工具类创建的线程池队列或线程默认为 Integer.MAX_VALUE
,容易堆积请求 阿里巴巴 Java 开发手
册:
FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请
求,从而导致 OOM
CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致
OOM。
推荐使用 ThreadPoolExecutor 类根据实际需要自定义创建。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor 类主要有以下七个参数:
int corePoolSize
:核心线程池大小,线程池中常驻线程的最大数量int maximumPoolSize
:最大核心线程池大小(包括核心线程和非核心线程)long keepAliveTime
:线程空闲后的存活时间(仅适用于非核心线程)TimeUnit unit
:超时单位BlockingQueue<Runnable> workQueue
:阻塞队列ThreadFactory threadFactory threadFactory
:线程工厂,创建线程的,一般默认RejectedExecutionHandler handle
:拒绝策略拒绝策略就是当队列满时,线程如何去处理新来的任务。
功能:当触发拒绝策略时,直接抛出拒绝执行的异常。
使用场景:ThreadPoolExecutor 中默认的策略就是 AbortPolicy,由于 ExecutorService 接口的系列
ThreadPoolExecutor 都没有显示的设置拒绝策略,所以默认的都是这个。
功能:只要线程池没有关闭,就由提交任务的当前线程处理。
使用场景:一般在不允许失败、对性能要求不高、并发量较小的场景下使用。
功能:直接丢弃这个任务,不触发任何动作
使用场景: 提交的任务无关紧要,一般用的少。
功能:抛弃下一个将要被执行的任务,相当于排队的时候把第一个人打死,然后自己代替。
使用场景:发布消息、修改消息类似场景。当老消息还未执行,此时新的消息又来了,这时未执行的消息的版本比
现在提交的消息版本要低就可以被丢弃了。
ArrayBlockingQueue:使用数组实现的有界阻塞队列,特性先进先出
LinkedBlockingQueue:使用链表实现的阻塞队列,特性先进先出,可以设置其容量,默认为
Interger.MAX_VALUE
PriorityBlockingQueue:使用平衡二叉树堆,实现的具有优先级的无界阻塞队列
DelayQueue:无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会
出队列。队列头元素是最块要过期的元素。
SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否
则插入操作一直处于阻塞状态
线程池处理流程图如下所示:
判断线程池里的核心线程是否都在执行任务?
否:调用/创建一个新的核心线程来执行任务
是:工作队列是否已满?
否:将新提交的任务存储在工作队列里
是:线程池里的线程数是否达到最大线程值?
否:调用/创建一个新的非核心线程来执行任务
是:执行线程池饱和策略
一个线程池 core 7;max 20;queue 50;100并发进来怎么分配的?
先有 7 个能直接得到执行, 接下来把 50 个进入队列排队等候, 在多开 13 个继续执行。 现在 70 个被安排上了,
剩下 30 个默认执行饱和策略。
execute:提交没有返回值,不能判断是否执行成功,只能提交一个 Runnable 的对象。
submit:会返回一个 Future 对象,通过 Future 的 get() 方法来获取返回值,submit 提交线程可以吃掉线程
中产生的异常,达到线程复用。当 get() 执行结果时异常才会抛出,原因是通过 submit 提交的线程,当发生
异常时,会将异常保存,待 future.get() 时才会抛出。
线程出现异常,线程会退出,并重新创建新的线程执行队列里任务,不能复用线程。
当业务代码的异常捕获了,线程就可以复用。
使用 ThreadFactory 的 UncaughtExceptionHandler 保证线程的所有异常都能捕获(包括业务的异常),兜底
的。如果提交方式用 execute,不能复用线程。
setUncaughtExceptionHandler+submit:可以吃掉异常并复用线程(是吃掉,不报错)。
setUncaughtExceptionHandler+submit+future.get():可以获取到异常并复用线程。
线程池中的一个 Work 对象可以看做是一个线程,如果线程池中的线程数已经到达最大值,则可以复用 Worker 中
的线程,即不断循环从队列中获取任务然后然后执行任务,如果从阻塞队列中获取到的任务不未null,这样则能够
复用线程执行任务,
线程池中有个 allowCoreThreadTimeOut 字段能够描述是否回收核心工作线程,线程池默认是 false 表示不回收
核心线程,我们可以使用allowCoreThreadTimeOut(true)方法来设置线程池回收核心线程。
提交线程的业务异常用 try catch 处理,保证线程不会异常退出。
业务之外的异常我们不可预见的,创建线程池设置 ThreadFactory 的 UncaughtExceptionHandler 可以对未
捕获的异常做保底处理,通过 submit 提交任务,可以吃掉异常并复用线程,想要捕获异常这时用
future.get()。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>ThreadPoolDemo1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ThreadPoolDemo1</name>
<description>ThreadPoolDemo1</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
universe.thread.pool.executor.threadNamePrefix=threadPoolTaskExecutor-
universe.thread.pool.executor.queueCapacity=100
universe.thread.pool.executor.rejectedExecutionHandler=java.util.concurrent.ThreadPoolExecutor$AbortPolicy
universe.thread.pool.executor.keepAliveSeconds=60
package com.example.threadpooldemo1.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author tom
*/
@Component
@Data
@Slf4j
public class PoolExecutorConfig {
/**
* 获取CPU核数
*/
private static final int CPU_NUM;
/**
* 配置线程池的前缀
*/
@Value("${universe.thread.pool.executor.threadNamePrefix}")
private String threadNamePrefix;
/**
* 线程池的核心线程数,在没有设置allowCoreThreadTimeOut为true的情况下
* 核心线程会在线程池中一直存活,即使处于闲置状态
*/
private Integer corePoolSize;
/**
* 线程池中的任务队列,通过线程池的execute()方法提交的Runnable对象会存储在该队列中
*/
@Value("${universe.thread.pool.executor.queueCapacity}")
private Integer queueCapacity;
/**
* 线程池所能容纳的最大线程数,当活动线程(核心线程+非核心线程)达到这个数值后
* 后续任务将会根据RejectedExecutionHandler来进行拒绝策略处理
*/
private Integer maxPoolSize;
/**
* 当任务无法被执行时(超过线程最大容量maximum并且workQueue已经被排满了)的处理策略
* - AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常
* - DiscardPolicy: 丢弃任务,但是不抛出异常。
* - DiscardOldestPolicy: 丢弃队列最前面的任务,然后重新提交被拒绝的任务
* - CallerRunsPolicy: 由调用线程(提交任务的线程)处理该任务
*/
@Value("${universe.thread.pool.executor.rejectedExecutionHandler}")
private String rejectedExecutionHandler;
/**
* 非核心线程闲置时的超时时长,超过该时长,非核心线程就会被回收,若线程池通设置
* 核心线程也允许timeOut,即allowCoreThreadTimeOut为true,则该时长
* 同样会作用于核心线程,在超过aliveTime时,核心线程也会被回收,AsyncTask
* 配置的线程池就是这样设置的
*/
@Value("${universe.thread.pool.executor.keepAliveSeconds}")
private Integer keepAliveSeconds;
public PoolExecutorConfig() {
log.info("该台服务器的CPU核心数为{}", CPU_NUM);
this.corePoolSize = CPU_NUM + 1;
this.maxPoolSize = 2 * CPU_NUM;
}
static {
CPU_NUM = Runtime.getRuntime().availableProcessors();
}
}
package com.example.threadpooldemo1.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
/**
* @author tom
* @description 线程池配置类
*/
@Configuration
@Slf4j
public class OurPoolExecutor {
private PoolExecutorConfig threadPoolExecutorConfig;
@Autowired
public void setThreadPoolExecutorConfig(PoolExecutorConfig threadPoolExecutorConfig) {
this.threadPoolExecutorConfig = threadPoolExecutorConfig;
}
@Bean
public Executor threadPoolTaskExecutor() {
String threadNamePrefix = threadPoolExecutorConfig.getThreadNamePrefix();
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
try {
log.info("开始初始化线程池 -->threadPoolTaskExecutor");
taskExecutor.setThreadNamePrefix(threadNamePrefix);
taskExecutor.setCorePoolSize(threadPoolExecutorConfig.getCorePoolSize());
taskExecutor.setMaxPoolSize(threadPoolExecutorConfig.getMaxPoolSize());
taskExecutor.setQueueCapacity(threadPoolExecutorConfig.getQueueCapacity());
// 通过反射获取RejectedExecutionHandlerClass的类模板
Class<?> rejectedExecutionHandlerClass = Class.forName(threadPoolExecutorConfig.getRejectedExecutionHandler());
// 获取RejectedExecutionHandlerClass类的实例
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) rejectedExecutionHandlerClass.newInstance();
taskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler);
taskExecutor.setKeepAliveSeconds(threadPoolExecutorConfig.getKeepAliveSeconds());
// 进行加载
taskExecutor.initialize();
log.info("初始化线程池完成:{}核心线程为{}-->", threadNamePrefix, taskExecutor.getCorePoolSize());
return taskExecutor;
} catch (Exception e) {
e.printStackTrace();
log.error("初始化线程池失败:{}失败原因为:{}", threadNamePrefix, e.getMessage());
return null;
}
}
}
# 输出
......
2023-12-15 21:41:13.577 INFO 12036 --- [ main]
2023-12-15 21:41:13.577 INFO 12036 --- [ main] c.e.t.ThreadPoolDemo1Application : No active profile set, falling back to default profiles: default
2023-12-15 21:41:14.317 INFO 12036 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2023-12-15 21:41:14.332 INFO 12036 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2023-12-15 21:41:14.332 INFO 12036 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54]
2023-12-15 21:41:14.386 INFO 12036 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2023-12-15 21:41:14.386 INFO 12036 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 771 ms
2023-12-15 21:41:14.417 INFO 12036 --- [ main] c.e.t.config.PoolExecutorConfig : 该台服务器的CPU核心数为8
2023-12-15 21:41:14.433 INFO 12036 --- [ main] c.e.t.config.OurPoolExecutor : 开始初始化线程池 -->threadPoolTaskExecutor
2023-12-15 21:41:14.433 INFO 12036 --- [ main] c.e.t.config.OurPoolExecutor : 初始化线程池完成:threadPoolTaskExecutor-核心线程为9-->
2023-12-15 21:41:14.649 INFO 12036 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2023-12-15 21:41:14.664 INFO 12036 --- [ main] c.e.t.ThreadPoolDemo1Application : Started ThreadPoolDemo1Application in 1.406 seconds (JVM running for 2.63)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>ThreadPoolDemo2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ThreadPoolDemo2</name>
<description>ThreadPoolDemo2</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.example.threadpooldemo2.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig {
/**
* 下面的配置是配置Springboot的@Async注解所用的线程池
*/
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置线程池核心容量
executor.setCorePoolSize(4);
// 设置线程池最大容量
executor.setMaxPoolSize(8);
// 设置任务队列长度
executor.setQueueCapacity(200);
// 设置线程超时时间
executor.setKeepAliveSeconds(60);
// 设置线程名称前缀
executor.setThreadNamePrefix("xyjAsyncPool-");
// 设置任务丢弃后的处理策略,当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
package com.example.threadpooldemo2.service;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @author tom
*/
@Service
public class AsyncService {
/**
* 异步方法,如果@Async加在类的上面,则整个类中的方法都是异步的
*/
@Async
public void print() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
System.out.println(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.threadpooldemo2.controller;
import com.example.threadpooldemo2.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Executor;
/**
* @author 徐一杰
* @date 2022/9/20 10:30
* @description
*/
@RestController
@RequestMapping("/test")
public class TestController {
private Executor execute;
private AsyncService asyncService;
@Autowired
@Qualifier("taskExecutor")
public void setExecute(Executor execute) {
this.execute = execute;
}
@Autowired
public void setAsyncService(AsyncService asyncService) {
this.asyncService = asyncService;
}
/**
* http://localhost:8080/test/helloThread
*/
@GetMapping("/helloThread")
public void helloThread() {
execute.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("111");
}
});
execute.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("222");
}
});
}
/**
* http://localhost:8080/test/helloAsync
*
* @return
*/
@GetMapping("/helloAsync")
public String helloAsync() {
// 这个方法是异步的
asyncService.print();
System.out.println("print方法还在循环,但我已经可以执行了");
return "print方法还在循环,但我已经可以执行了";
}
}
# 请求
http://localhost:8080/test/helloThread
# 输出
111
222
222
222
222
222
111
111
222
222
222
222
222
111
111
111
111
111
111
111
# 请求
http://localhost:8080/test/helloAsync
print方法还在循环,但我已经可以执行了
# 输出
print方法还在循环,但我已经可以执行了
0
1
2
3
4
5
6
7
8
9
CompletableFuture,结合了 Future 的优点,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编
程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合
CompletableFuture 的方法。
CompletableFuture 可以传入自定义线程池,否则使用自己默认的线程池,我们习惯做法是自定义线程池,控制
整个项目的线程数量,不使用自定义的线程池,做到可控可调。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>ThreadPoolDemo3</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ThreadPoolDemo3</name>
<description>ThreadPoolDemo3</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
service.thread.coreSize = 10
service.thread.maxSize = 100
service.thread.keepAliveTime = 100
package com.example.threadpooldemo3.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @Description: 线程池属性
* @Author: tom
*/
@ConfigurationProperties(prefix = "service.thread")
@Data
public class ThreadPoolConfigProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
package com.example.threadpooldemo3.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author tom
* @Description: 线程池配置类:根据不同业务定义不同的线程池配置
**/
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
@Slf4j
public class MyServiceThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
log.info("{}", pool.getCoreSize());
log.info("{}", pool.getMaxSize());
log.info("{}", pool.getKeepAliveTime());
return new ThreadPoolExecutor(pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
}
}
package com.example.threadpooldemo3;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
@SpringBootTest
@Slf4j
class ThreadPoolDemo3ApplicationTests {
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@Test
void contextLoads() {
log.info("main.................start.....");
CompletableFuture.runAsync(() -> {
System.out.println("当前线程: " + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果: " + i);
}, threadPoolExecutor);
log.info("main.................end......");
}
}
# 输出
......
2023-12-16 11:02:12.908 INFO 5872 --- [ main] c.e.t.config.MyServiceThreadConfig : 10
2023-12-16 11:02:12.908 INFO 5872 --- [ main] c.e.t.config.MyServiceThreadConfig : 100
2023-12-16 11:02:12.908 INFO 5872 --- [ main] c.e.t.config.MyServiceThreadConfig : 100
2023-12-16 11:02:13.889 INFO 5872 --- [ main] c.e.t.ThreadPoolDemo3ApplicationTests : Started ThreadPoolDemo3ApplicationTests in 1.841 seconds (JVM running for 2.716)
2023-12-16 11:02:14.042 INFO 5872 --- [ main] c.e.t.ThreadPoolDemo3ApplicationTests : main.................start.....
2023-12-16 11:02:14.042 INFO 5872 --- [ main] c.e.t.ThreadPoolDemo3ApplicationTests : main.................end......
当前线程: 16
运行结果: 5
在现实的互联网项目开发中,针对高并发的请求,一般的做法是高并发接口单独线程池隔离处理,可能为某一高并
发的接口单独一个线程池。
使用 @Async
注解,在默认情况下用的是 SimpleAsyncTaskExecutor
线程池,该线程池不是真正意义上的线程
池。使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终会导致系统占
用内存过高,引发 OutOfMemoryError 错误。
【建议】不同类型的业务任务尽量使用不同的线程池。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>ThreadPoolDemo4</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ThreadPoolDemo4</name>
<description>ThreadPoolDemo4</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?
创建了一个 ThreadPoolTaskExecutor
的子类,在每次提交线程任务的时候都会将当前线程池的运行状况打
印出来。
package com.example.threadpooldemo4.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author tom
*/
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
package com.example.threadpooldemo4.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 注解@async配置
* @Author: tom
*/
@Slf4j
@EnableAsync
@Configuration
public class AsyncThreadConfig implements AsyncConfigurer {
/**
* 定义@Async默认的线程池
* ThreadPoolTaskExecutor不是完全被IOC容器管理的bean,可以在方法上加上@Bean注解交给容器管理,这样可以将taskExecutor.initialize()方法调用去掉,容器会自动调用
*
* @return
*/
@Override
public Executor getAsyncExecutor() {
int processors = Runtime.getRuntime().availableProcessors();
// 常用的执行器
// ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 可以查看线程池参数的自定义执行器
ThreadPoolTaskExecutor taskExecutor = new VisibleThreadPoolTaskExecutor();
// 核心线程数
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(2);
// 线程队列最大线程数,默认:50
taskExecutor.setQueueCapacity(50);
// 线程名称前缀
taskExecutor.setThreadNamePrefix("My-Thread-");
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化(重要)
taskExecutor.initialize();
return taskExecutor;
}
/**
* 异步方法执行的过程中抛出的异常捕获
*
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error("线程池执行任务发送未知错误,执行方法:{}", method.getName(), ex.getMessage());
}
}
package com.example.threadpooldemo4.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Random;
/**
* @author tom
*/
@Service
@Slf4j
public class TestService {
private Random random = new Random();
/**
* 默认线程池
*/
@Async
public void defaultThread() throws Exception {
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(1000));
long end = System.currentTimeMillis();
int i = 1 / 0;
log.info("使用默认线程池,耗时:" + (end - start) + "毫秒");
}
}
package com.example.threadpooldemo4.controller;
import com.example.threadpooldemo4.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
private TestService testService;
@Autowired
public void setTestService(TestService testService) {
this.testService = testService;
}
@GetMapping("/test")
public void test() {
try {
testService.defaultThread();
} catch (Exception e) {
e.printStackTrace();
}
}
}
# 进行多次请求
http://localhost:8080/test
# 输出
......
2023-12-16 11:23:39.848 INFO 5872 --- [nio-8080-exec-3] c.e.t.c.VisibleThreadPoolTaskExecutor : My-Thread-, 2. do submit,taskCount [0], completedTaskCount [0], activeCount [0], queueSize [0]
2023-12-16 11:23:39.899 ERROR 5872 --- [ My-Thread-1] c.e.t.config.AsyncThreadConfig : 线程池执行任务发送未知错误,执行方法:defaultThread
2023-12-16 11:25:25.221 INFO 5872 --- [nio-8080-exec-5] c.e.t.c.VisibleThreadPoolTaskExecutor : My-Thread-, 2. do submit,taskCount [1], completedTaskCount [1], activeCount [0], queueSize [0]
2023-12-16 11:25:25.866 ERROR 5872 --- [ My-Thread-1] c.e.t.config.AsyncThreadConfig : 线程池执行任务发送未知错误,执行方法:defaultThread
2023-12-16 11:25:46.245 INFO 5872 --- [nio-8080-exec-6] c.e.t.c.VisibleThreadPoolTaskExecutor : My-Thread-, 2. do submit,taskCount [2], completedTaskCount [2], activeCount [0], queueSize [0]
2023-12-16 11:25:46.325 ERROR 5872 --- [ My-Thread-1] c.e.t.config.AsyncThreadConfig : 线程池执行任务发送未知错误,执行方法:defaultThread
2023-12-16 11:25:47.180 INFO 5872 --- [nio-8080-exec-8] c.e.t.c.VisibleThreadPoolTaskExecutor : My-Thread-, 2. do submit,taskCount [3], completedTaskCount [3], activeCount [0], queueSize [0]
2023-12-16 11:25:47.805 ERROR 5872 --- [ My-Thread-1] c.e.t.config.AsyncThreadConfig : 线程池执行任务发送未知错误,执行方法:defaultThread
2023-12-16 11:25:47.913 INFO 5872 --- [nio-8080-exec-9] c.e.t.c.VisibleThreadPoolTaskExecutor : My-Thread-, 2. do submit,taskCount [4], completedTaskCount [4], activeCount [0], queueSize [0]
2023-12-16 11:25:48.272 ERROR 5872 --- [ My-Thread-1] c.e.t.config.AsyncThreadConfig : 线程池执行任务发送未知错误,执行方法:defaultThread
由于业务需要,根据业务不同需要不同的线程池。
package com.example.threadpooldemo4.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 注解@async配置
* @Author: tom
*/
@Slf4j
@EnableAsync
@Configuration
public class MyAsyncThreadConfig {
@Bean("myServiceExecutor")
public Executor myServiceExecutor() {
// Java虚拟机可用的处理器数
int processors = Runtime.getRuntime().availableProcessors();
// 定义线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 可以查看线程池参数的自定义执行器
// ThreadPoolTaskExecutor taskExecutor = new VisibleThreadPoolTaskExecutor();
// 核心线程数
taskExecutor.setCorePoolSize(processors);
taskExecutor.setMaxPoolSize(100);
// 线程队列最大线程数,默认:100
taskExecutor.setQueueCapacity(100);
// 线程名称前缀
taskExecutor.setThreadNamePrefix("My-Thread-");
// 线程池中线程最大空闲时间,默认:60,单位:秒
taskExecutor.setKeepAliveSeconds(60);
// 核心线程是否允许超时,默认:false
taskExecutor.setAllowCoreThreadTimeOut(false);
// IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
taskExecutor.setWaitForTasksToCompleteOnShutdown(false);
// 阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
taskExecutor.setAwaitTerminationSeconds(10);
/**
* 拒绝策略,默认是AbortPolicy
* AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常
* DiscardPolicy: 丢弃任务但不抛出异常
* DiscardOldestPolicy: 丢弃最旧的处理程序,然后重试,如果执行器关闭,这时丢弃任务
* CallerRunsPolicy: 执行器执行任务失败,则在策略回调方法中执行任务,如果执行器关闭,这时丢弃任务
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return taskExecutor;
}
}
package com.example.threadpooldemo4.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Random;
/**
* @author tom
*/
@Service
@Slf4j
public class TestService {
private Random random = new Random();
/**
* 默认线程池
*/
@Async
public void defaultThread() throws Exception {
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(1000));
long end = System.currentTimeMillis();
int i = 1 / 0;
log.info("使用默认线程池,耗时:" + (end - start) + "毫秒");
}
/**
* 指定线程池myServiceExecutor
*
* @throws Exception
*/
@Async("myServiceExecutor")
public void myServiceExecutor() throws Exception {
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(1000));
long end = System.currentTimeMillis();
log.info("使用线程池myServiceExecutor,耗时:" + (end - start) + "毫秒");
}
}
package com.example.threadpooldemo4.controller;
import com.example.threadpooldemo4.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
private TestService testService;
@Autowired
public void setTestService(TestService testService) {
this.testService = testService;
}
@GetMapping("/test")
public void test() {
try {
testService.defaultThread();
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping("/test2")
public void test2() {
try {
testService.myServiceExecutor();
} catch (Exception e) {
e.printStackTrace();
}
}
}
# 多次请求
http://localhost:8080/test2
# 输出
......
2023-12-16 11:37:28.006 INFO 4016 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms
2023-12-16 11:37:28.126 INFO 4016 --- [ My-Thread-1] c.e.threadpooldemo4.service.TestService : 使用线程池myServiceExecutor,耗时:104毫秒
2023-12-16 11:37:34.607 INFO 4016 --- [ My-Thread-2] c.e.threadpooldemo4.service.TestService : 使用线程池myServiceExecutor,耗时:287毫秒
2023-12-16 11:37:35.284 INFO 4016 --- [ My-Thread-3] c.e.threadpooldemo4.service.TestService : 使用线程池myServiceExecutor,耗时:230毫秒
package com.example.threadpooldemo4.config;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 注解@async配置
* @Author: tom
*/
@Slf4j
@EnableAsync
@Configuration
public class MyAsyncThreadConfig {
@Bean("myServiceExecutor")
public Executor myServiceExecutor() {
// Java虚拟机可用的处理器数
int processors = Runtime.getRuntime().availableProcessors();
// 定义线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 可以查看线程池参数的自定义执行器
// ThreadPoolTaskExecutor taskExecutor = new VisibleThreadPoolTaskExecutor();
// 核心线程数
taskExecutor.setCorePoolSize(processors);
taskExecutor.setMaxPoolSize(100);
// 线程队列最大线程数,默认:100
taskExecutor.setQueueCapacity(100);
// 线程名称前缀
taskExecutor.setThreadNamePrefix("My-Thread-");
// 线程池中线程最大空闲时间,默认:60,单位:秒
taskExecutor.setKeepAliveSeconds(60);
// 核心线程是否允许超时,默认:false
taskExecutor.setAllowCoreThreadTimeOut(false);
// IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
taskExecutor.setWaitForTasksToCompleteOnShutdown(false);
// 阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
taskExecutor.setAwaitTerminationSeconds(10);
/**
* 拒绝策略,默认是AbortPolicy
* AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常
* DiscardPolicy: 丢弃任务但不抛出异常
* DiscardOldestPolicy: 丢弃最旧的处理程序,然后重试,如果执行器关闭,这时丢弃任务
* CallerRunsPolicy: 执行器执行任务失败,则在策略回调方法中执行任务,如果执行器关闭,这时丢弃任务
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 使用guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Thread" + "-%d").setDaemon(true).build();
taskExecutor.setThreadFactory(threadFactory);
return taskExecutor;
}
}
测试:
# 多次请求:
http://localhost:8080/test2
# 输出
......
2023-12-16 11:55:18.226 INFO 12316 --- [ Test-Thread-0] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:113毫秒
2023-12-16 11:55:20.743 INFO 12316 --- [ Test-Thread-1] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:335毫秒
2023-12-16 11:55:21.091 INFO 12316 --- [ Test-Thread-3] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:357毫秒
2023-12-16 11:55:21.185 INFO 12316 --- [ Test-Thread-2] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:614毫秒
2023-12-16 11:55:21.217 INFO 12316 --- [ Test-Thread-6] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:82毫秒
2023-12-16 11:55:21.242 INFO 12316 --- [ Test-Thread-4] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:387毫秒
2023-12-16 11:55:21.370 INFO 12316 --- [ Test-Thread-7] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:85毫秒
2023-12-16 11:55:21.414 INFO 12316 --- [ Test-Thread-5] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:407毫秒
自己实现 ThreadFactory接口:
package com.example.threadpooldemo4.config;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author tom
* 自己实现 ThreadFactory接口
* 线程工厂,它设置线程名称,有利于我们定位问题
*/
public final class NamingThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemo;
private final ThreadGroup mGroup;
public NamingThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamingThreadFactory(String prefix) {
this(prefix, false);
}
public NamingThreadFactory(String prefix, boolean daemo) {
mPrefix = prefix + "-thread-";
mDaemo = daemo;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
@Override
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemo);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}
}
package com.example.threadpooldemo4.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 注解@async配置
* @Author: tom
*/
@Slf4j
@EnableAsync
@Configuration
public class MyAsyncThreadConfig {
@Bean("myServiceExecutor")
public Executor myServiceExecutor() {
// Java虚拟机可用的处理器数
int processors = Runtime.getRuntime().availableProcessors();
// 定义线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 可以查看线程池参数的自定义执行器
// ThreadPoolTaskExecutor taskExecutor = new VisibleThreadPoolTaskExecutor();
// 核心线程数
taskExecutor.setCorePoolSize(processors);
taskExecutor.setMaxPoolSize(100);
// 线程队列最大线程数,默认:100
taskExecutor.setQueueCapacity(100);
// 线程名称前缀
taskExecutor.setThreadNamePrefix("My-Thread-");
// 线程池中线程最大空闲时间,默认:60,单位:秒
taskExecutor.setKeepAliveSeconds(60);
// 核心线程是否允许超时,默认:false
taskExecutor.setAllowCoreThreadTimeOut(false);
// IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
taskExecutor.setWaitForTasksToCompleteOnShutdown(false);
// 阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
taskExecutor.setAwaitTerminationSeconds(10);
/**
* 拒绝策略,默认是AbortPolicy
* AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常
* DiscardPolicy: 丢弃任务但不抛出异常
* DiscardOldestPolicy: 丢弃最旧的处理程序,然后重试,如果执行器关闭,这时丢弃任务
* CallerRunsPolicy: 执行器执行任务失败,则在策略回调方法中执行任务,如果执行器关闭,这时丢弃任务
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 自己实现 ThreadFactory接口
ThreadFactory threadFactory = new NamingThreadFactory("Dev");
taskExecutor.setThreadFactory(threadFactory);
return taskExecutor;
}
}
测试:
# 多次请求:
http://localhost:8080/test2
# 输出
......
2023-12-16 12:10:10.947 INFO 3304 --- [ Dev-thread-2] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:274毫秒
2023-12-16 12:10:11.222 INFO 3304 --- [ Dev-thread-3] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:432毫秒
2023-12-16 12:10:11.388 INFO 3304 --- [ Dev-thread-1] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:990毫秒
2023-12-16 12:10:11.502 INFO 3304 --- [ Dev-thread-5] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:429毫秒
2023-12-16 12:10:11.567 INFO 3304 --- [ Dev-thread-4] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:627毫秒
2023-12-16 12:10:11.750 INFO 3304 --- [ Dev-thread-7] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:410毫秒
2023-12-16 12:10:11.836 INFO 3304 --- [ Dev-thread-6] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:648毫秒
2023-12-16 12:10:11.918 INFO 3304 --- [ Dev-thread-8] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:466毫秒
2023-12-16 12:10:12.108 INFO 3304 --- [ Dev-thread-2] c.e.threadpooldemo4.service.TestService : myServiceExecutor,耗时:503毫秒
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程
是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空
闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处
理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体
的计算方法是 2N。
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,
文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大
部分时间都花在了等待 IO 操作完成上。
// 获取逻辑核心数,如6核心12线程,那么返回的是12
Runtime.getRuntime().availableProcessors()
# 总核数 = 物理CPU个数 X 每颗物理CPU的核数
# 总逻辑CPU数 = 物理CPU个数 X 每颗物理CPU的核数 X 超线程数
# 查看物理CPU个数
cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l
# 查看每个物理CPU中core的个数(即核数)
cat /proc/cpuinfo| grep "cpu cores"| uniq
# 查看逻辑CPU的个数
cat /proc/cpuinfo| grep "processor"| wc -l