ThreadPoolExecutor线程池和CompletableFuture 项目实战使用

发布时间:2024年01月21日

ThreadPoolExecutor线程池和CompletableFuture 项目实战使用

前言:
面试题,多线程的创建方式?线程池的参数有哪些?怎么配置核心线程数和最大可扩展线程数…
这次主讲实战线程池的实战使用
前景:

1、线程池的好处

线程的多次使用
提高响应速度
线程可配置…

2、核心线程数和最大可扩展线程数的设置

主要考虑以下情况:
下面是本人考虑的点,可能有误,还有其他考虑的点,可以留言,我这边同步修改下。
项目cpu的核数、任务是cpu密集型还是IO密集型、项目有几个地方会使用线程池…

代码实现

核心线程数的考虑是:
设计思路:

1,当前项目是一个服务器,有几个接口功能数据,需要第三方推送数据,我这边主要是做大屏展示和数据综合处理,有伙伴会问,咋那边推送,为啥不加个中间件这样解耦,异步啥的,因为项目是还在初期阶段,目前还没立项,后期会接中间件,如果立项后,中间件实战使用我再更新…

2,为啥不使用单线程?
如果推送数据量很大的话,单线程可以实现,但是增加项目项目直接的传输时间,如果网络抖动,数据丢失,额外增加后期维护运维成本…

3,核心线程数的设置
目前有3个地方推送数据,目前没写定时任务,因为对方数据可能会变,那边是把数据汇总起来,然后通过传参的方式,我们这边提供openapi接口,那边直接调用即可

核心线程数设置为5个
原因:目前有4个地方时提供openapi接口,随时对方可能推送数据

最大可扩展线程数10个
功能主要是IO密集型,最大可扩展线程数 = 核心线程数 X 2

yml 配置线程池的参数
# 配置多线程参数
threadPool:
  coreSize: 5
  maxSize: 10
  keepLiveTime: 120
  blockQueueSize: 500
线程池配置类
package com..xx.xx.config;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author psd 配置线程池的相关参数
 * @create 2023-12-28 16:06
 */

public class ThreadPoolConfig {

    /**
     * xxx数据推送 线程池配置
     * 
     * @param coreSize
     *            核心线程数
     * @param maxSize
     *            最大可扩展线程数
     * @param keepLiveTime
     *            生存时间
     * @param blockQueueSize
     *            阻塞队列
     * @return threadCustomPoolExecutor
     */
    @Bean
    public ThreadPoolExecutor threadCustomPoolExecutor(@Value("${threadPool.coreSize}") Integer coreSize, @Value("${threadPool.maxSize}") Integer maxSize,
            @Value("${threadPool.keepLiveTime}") Integer keepLiveTime, @Value("${threadPool.blockQueueSize}") Integer blockQueueSize) {
        return new ThreadPoolExecutor(coreSize, maxSize, keepLiveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<>(blockQueueSize));
    }

}

线程使用
package com.xxx.xxxr.xx.service.impl.callback;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

/**
 * @author psd xx接xx数据回调接口
 */
@Slf4j
@Service("callxxxBackxxService")
public class CallxxxBackxxServiceImpl implements CallBackService {

    @Resource
    private JointaaexxxInspectionService jointaaexxxInspectionService ;
    
    @Resource
    private OriginalDataLogService originalDataLogService;

    @Autowired
    private ThreadPoolExecutor threadCustomPoolExecutor;

    @Autowired
    private AcceptPushDataLogService acceptPushDataLogService;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void callxxxBackxxInspection(JoxxtlyxxxInspectionObject networkInspectionObject, AcceptPushDataLogEntity acceptPushDataLogEntity) {
        // xxxxx清单集合
        List<JointlyxxDataObject> networkDataObjectList = networkInspectionObject.getDataList();
        if (CollectionUtils.isEmpty(networkDataObjectList)) {
            return;
        }
        // 待添加xxx清单集合
        List<JointVenturexxxEntity> needToAddNetworkInspectionList = new ArrayList<>();
    
        Date synDate = new Date();
        // 批量插入原始数据日志 初始化一个没有返回结果集的子任务
        CompletableFuture<Void> insertOriginalDataLogFuture = CompletableFuture.runAsync(() -> {
            List<OriginalDataLogEntity> entityList = networkDataObjectList.stream().map(networkObject -> {
                OriginalDataLogEntity entity = new OriginalDataLogEntity();
                // 设置你要传递的参数
                ...
                return entity;
            }).collect(Collectors.toList());
            // 批量处理接收原始数据日志表数据,用于后期做运维排查问题
            int batchSize = ConstantUtils.BATCH_SIZE;
            batchProcessingOriginalDataLog(entityList, batchSize);
        }, threadCustomPoolExecutor);

        // 批量插入xxx清单数据
        CompletableFuture<Void> insertJointVentureNetWorkFuture = CompletableFuture.runAsync(() -> {
            networkDataObjectList.stream().forEach(networkInspection -> {
                JointxxturexxxkIxxectionEntity entity = new JointVentureNetworkInspectionEntity();
                // 添加添加的数据
                entity.setRetryxxxuntxxx(xxx);
                ...
                needToAddNetworkInspectionList.add(entity);
            });
            // 批量添加xx清单数据
            int batchSize = ConstantUtils.BATCH_SIZE;
            for (int i = 0; i < needToAddNetworkInspectionList.size(); i += batchSize) {
                int endIndex = Math.min(i + batchSize, needToAddNetworkInspectionList.size());
                jointaaexxxInspectionService.batchInsertJointVentureNetworkInspection(needToAddNetworkInspectionList.subList(i, endIndex));
            }
        }, threadCustomPoolExecutor);

        // 组合方法并阻塞,等待insertJointVentureNetWorkFuture, insertOriginalDataLogFuture都执行完后,才会放行
        CompletableFuture.allOf(insertJointVentureNetWorkFuture, insertOriginalDataLogFuture).exceptionally(t -> {
            log.error("xxx-xx的异步任务出现了异常:{}", t.getMessage());
            // 记录到日志表里面,方便后期看错误信息
            acceptPushDataLogEntity.setError(t.getMessage());
            acceptPushDataLogService.updateById(acceptPushDataLogEntity);
            return null;
        }).join();
    }
    

    /**
     * 批量处理接收推送原始数据日志表数据
     * 
     * @param entityList
     *            原始数据集合
     * @param batchSize
     *            批量处理的大小
     */
    private void batchProcessingOriginalDataLog(List<OriginalDataLogEntity> entityList, int batchSize) {
        for (int i = 0; i < entityList.size(); i += batchSize) {
            int endIndex = Math.min(i + batchSize, entityList.size());
            originalDataLogService.batchInsertOriginalDataLog(entityList.subList(i, endIndex));
        }
    }
    
}
    

代码亮点:

1,使用多线程的方式处理任务,
目前这里使用的是2个线程,处理不同的表数据
2,使用异步编码编写代码

 	// 初始化一个没有返回结果集的子任务
    CompletableFuture<Void> insertOriginalDataLogFuture = CompletableFuture.runAsync(() -> {
          // 相关的业务逻辑
          ...
        }, threadCustomPoolExecutor);

3,数据采用批量处理方式,提高插入数据库的效率
4,优雅使用Stream流的方式,对数据进行处理,不使用原始方式比如for 循环的方式

测试效率每分钟可处理10万数据,进行落库。
在这里插入图片描述

喜欢我的文章的话,麻烦点个阅读或者点个点赞,是我编写博客的动力

文章来源:https://blog.csdn.net/qq_45938780/article/details/135712375
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。