SpringBoot集成线程池

发布时间:2024年01月23日
使用线程池的好处:
  • 降低资源消耗

:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度

:当任务到达时,可以不需要等待线程创建就能立即执行。

  • 提高线程的可管理性

:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,监控和调优。

配置及调用:
1、创建配置类
@EnableAsync
//配置类
@Configuration
public class ThreadPoolsConfig {
@Bean("AiTaskExecutor")
    public Executor visible() {
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(500);//maxPoolSize * 1000
        executor.setThreadNamePrefix("aiThread");
        executor.setThreadPriority(Thread.MAX_PRIORITY);
        //设置线程池关闭的时候 等待所有的任务完成后再继续销毁其他的bean
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
 }
2、创建线程打印类,用于跟踪线程使用情况
package com.cmes.ghg.ai.common;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Author: lizj
 * @CreateTime: 2023-05-30  16:38
 * @Description: TODO
 */
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger log = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class);
    //2、编写打印线程池方法
    private void log(String method){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if(threadPoolExecutor==null){
            return;
        }

        log.info("线程池:{}, 执行方法:{}, 完成任务数量 [{}], 活跃线程数 [{}], 队列长度 [{}]",
                this.getThreadNamePrefix(),
                method,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }



    //3、重写方法,进行日志的记录
    @Override
    public void execute(Runnable task) {
        log("execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        log("execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        log("submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        log("InitPortThread");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        log("submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        log("submitListenable");
        return super.submitListenable(task);
    }
}
3、通过注解调用异步线程池使用;
@Override
@Async("AiTaskExecutor")
public void AisDataCal(String startTime,String endTime,String jobId,String jobFinishTime) throws Exception {
    ...省略业务代码实现
    }
4、如果线程核心方法需要循环调用或需要创建多个异步线程,需要再创建一个接口调用,如果不需要则直接通过控制层调用
@Service
public class InitThreadPoolImpl implements InitThreadPool {
    @Autowired
    //线程核心方法接口
    ICalProxyService calProxyService;
    @Override
    public boolean CalAisDataByShipInfo() throws Exception {
        List<Map<String, String>> shipMaps = ClickHouseConfig.exeSelectSql(GlobalVariables.shipSql);
        List<List<Map<String, String>>> shipListSplit = Lists.partition(shipMaps, 80);
        for (List<Map<String, String>> maps : shipListSplit) {
            calProxyService.AisDataCal(shipMaps, "", "", "", "");
        }
        return false;
    }
}
5、调用返回示例

6、接收返回信息时,使用CompletableFuture进行接收
@Async("AiTaskExecutor")
public CompletableFuture<Map> ExportListAssessment(Map map) throws Exception {
    map.put("isExportFlag","1");
    Map assessment = this.getAssessment(map);
    return CompletableFuture.completedFuture(assessment);
}
7、Controller层接收
@RequestMapping(value = "/exportListAssessment",method = RequestMethod.POST)
@ApiOperation("社会船评估列表导出")
public ResultData<Map<String,Object>> exportListAssessment(@RequestBody Map paramMap) throws Exception {
    CompletableFuture<Map> mapCompletableFuture = emissionsAssessment.ExportListAssessment(paramMap);
    Map map = mapCompletableFuture.get();
    return ResultData.success(map);
}

文章来源:https://blog.csdn.net/u013682979/article/details/135754584
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。