:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
:当任务到达时,可以不需要等待线程创建就能立即执行。
:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,监控和调优。
@EnableAsync
//配置类
@Configuration
public class ThreadPoolsConfig {
@Bean("AiTaskExecutor")
public Executor visible() {
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);//maxPoolSize * 1000
executor.setThreadNamePrefix("aiThread");
executor.setThreadPriority(Thread.MAX_PRIORITY);
//设置线程池关闭的时候 等待所有的任务完成后再继续销毁其他的bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
package com.cmes.ghg.ai.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author: lizj
* @CreateTime: 2023-05-30 16:38
* @Description: TODO
*/
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger log = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class);
//2、编写打印线程池方法
private void log(String method){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(threadPoolExecutor==null){
return;
}
log.info("线程池:{}, 执行方法:{}, 完成任务数量 [{}], 活跃线程数 [{}], 队列长度 [{}]",
this.getThreadNamePrefix(),
method,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
//3、重写方法,进行日志的记录
@Override
public void execute(Runnable task) {
log("execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
log("execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
log("submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
log("InitPortThread");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
log("submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
log("submitListenable");
return super.submitListenable(task);
}
}
@Override
@Async("AiTaskExecutor")
public void AisDataCal(String startTime,String endTime,String jobId,String jobFinishTime) throws Exception {
...省略业务代码实现
}
@Service
public class InitThreadPoolImpl implements InitThreadPool {
@Autowired
//线程核心方法接口
ICalProxyService calProxyService;
@Override
public boolean CalAisDataByShipInfo() throws Exception {
List<Map<String, String>> shipMaps = ClickHouseConfig.exeSelectSql(GlobalVariables.shipSql);
List<List<Map<String, String>>> shipListSplit = Lists.partition(shipMaps, 80);
for (List<Map<String, String>> maps : shipListSplit) {
calProxyService.AisDataCal(shipMaps, "", "", "", "");
}
return false;
}
}
@Async("AiTaskExecutor")
public CompletableFuture<Map> ExportListAssessment(Map map) throws Exception {
map.put("isExportFlag","1");
Map assessment = this.getAssessment(map);
return CompletableFuture.completedFuture(assessment);
}
@RequestMapping(value = "/exportListAssessment",method = RequestMethod.POST)
@ApiOperation("社会船评估列表导出")
public ResultData<Map<String,Object>> exportListAssessment(@RequestBody Map paramMap) throws Exception {
CompletableFuture<Map> mapCompletableFuture = emissionsAssessment.ExportListAssessment(paramMap);
Map map = mapCompletableFuture.get();
return ResultData.success(map);
}