前言:
面试题,多线程的创建方式?线程池的参数有哪些?怎么配置核心线程数和最大可扩展线程数…
这次主讲实战线程池的实战使用
前景:
线程的多次使用
提高响应速度
线程可配置…
主要考虑以下情况:
下面是本人考虑的点,可能有误,还有其他考虑的点,可以留言,我这边同步修改下。
项目cpu的核数、任务是cpu密集型还是IO密集型、项目有几个地方会使用线程池…
核心线程数的考虑是:
设计思路:
1,当前项目是一个服务器,有几个接口功能数据,需要第三方推送数据,我这边主要是做大屏展示和数据综合处理,有伙伴会问,咋那边推送,为啥不加个中间件这样解耦,异步啥的,因为项目是还在初期阶段,目前还没立项,后期会接中间件,如果立项后,中间件实战使用我再更新…
2,为啥不使用单线程?
如果推送数据量很大的话,单线程可以实现,但是增加项目项目直接的传输时间,如果网络抖动,数据丢失,额外增加后期维护运维成本…
3,核心线程数的设置
目前有3个地方推送数据,目前没写定时任务,因为对方数据可能会变,那边是把数据汇总起来,然后通过传参的方式,我们这边提供openapi接口,那边直接调用即可
核心线程数设置为5个
原因:目前有4个地方时提供openapi接口,随时对方可能推送数据
最大可扩展线程数10个
功能主要是IO密集型,最大可扩展线程数 = 核心线程数 X 2
# 配置多线程参数
threadPool:
coreSize: 5
maxSize: 10
keepLiveTime: 120
blockQueueSize: 500
package com..xx.xx.config;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author psd 配置线程池的相关参数
* @create 2023-12-28 16:06
*/
public class ThreadPoolConfig {
/**
* xxx数据推送 线程池配置
*
* @param coreSize
* 核心线程数
* @param maxSize
* 最大可扩展线程数
* @param keepLiveTime
* 生存时间
* @param blockQueueSize
* 阻塞队列
* @return threadCustomPoolExecutor
*/
@Bean
public ThreadPoolExecutor threadCustomPoolExecutor(@Value("${threadPool.coreSize}") Integer coreSize, @Value("${threadPool.maxSize}") Integer maxSize,
@Value("${threadPool.keepLiveTime}") Integer keepLiveTime, @Value("${threadPool.blockQueueSize}") Integer blockQueueSize) {
return new ThreadPoolExecutor(coreSize, maxSize, keepLiveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<>(blockQueueSize));
}
}
package com.xxx.xxxr.xx.service.impl.callback;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
/**
* @author psd xx接xx数据回调接口
*/
@Slf4j
@Service("callxxxBackxxService")
public class CallxxxBackxxServiceImpl implements CallBackService {
@Resource
private JointaaexxxInspectionService jointaaexxxInspectionService ;
@Resource
private OriginalDataLogService originalDataLogService;
@Autowired
private ThreadPoolExecutor threadCustomPoolExecutor;
@Autowired
private AcceptPushDataLogService acceptPushDataLogService;
@Override
@Transactional(rollbackFor = Exception.class)
public void callxxxBackxxInspection(JoxxtlyxxxInspectionObject networkInspectionObject, AcceptPushDataLogEntity acceptPushDataLogEntity) {
// xxxxx清单集合
List<JointlyxxDataObject> networkDataObjectList = networkInspectionObject.getDataList();
if (CollectionUtils.isEmpty(networkDataObjectList)) {
return;
}
// 待添加xxx清单集合
List<JointVenturexxxEntity> needToAddNetworkInspectionList = new ArrayList<>();
Date synDate = new Date();
// 批量插入原始数据日志 初始化一个没有返回结果集的子任务
CompletableFuture<Void> insertOriginalDataLogFuture = CompletableFuture.runAsync(() -> {
List<OriginalDataLogEntity> entityList = networkDataObjectList.stream().map(networkObject -> {
OriginalDataLogEntity entity = new OriginalDataLogEntity();
// 设置你要传递的参数
...
return entity;
}).collect(Collectors.toList());
// 批量处理接收原始数据日志表数据,用于后期做运维排查问题
int batchSize = ConstantUtils.BATCH_SIZE;
batchProcessingOriginalDataLog(entityList, batchSize);
}, threadCustomPoolExecutor);
// 批量插入xxx清单数据
CompletableFuture<Void> insertJointVentureNetWorkFuture = CompletableFuture.runAsync(() -> {
networkDataObjectList.stream().forEach(networkInspection -> {
JointxxturexxxkIxxectionEntity entity = new JointVentureNetworkInspectionEntity();
// 添加添加的数据
entity.setRetryxxxuntxxx(xxx);
...
needToAddNetworkInspectionList.add(entity);
});
// 批量添加xx清单数据
int batchSize = ConstantUtils.BATCH_SIZE;
for (int i = 0; i < needToAddNetworkInspectionList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, needToAddNetworkInspectionList.size());
jointaaexxxInspectionService.batchInsertJointVentureNetworkInspection(needToAddNetworkInspectionList.subList(i, endIndex));
}
}, threadCustomPoolExecutor);
// 组合方法并阻塞,等待insertJointVentureNetWorkFuture, insertOriginalDataLogFuture都执行完后,才会放行
CompletableFuture.allOf(insertJointVentureNetWorkFuture, insertOriginalDataLogFuture).exceptionally(t -> {
log.error("xxx-xx的异步任务出现了异常:{}", t.getMessage());
// 记录到日志表里面,方便后期看错误信息
acceptPushDataLogEntity.setError(t.getMessage());
acceptPushDataLogService.updateById(acceptPushDataLogEntity);
return null;
}).join();
}
/**
* 批量处理接收推送原始数据日志表数据
*
* @param entityList
* 原始数据集合
* @param batchSize
* 批量处理的大小
*/
private void batchProcessingOriginalDataLog(List<OriginalDataLogEntity> entityList, int batchSize) {
for (int i = 0; i < entityList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, entityList.size());
originalDataLogService.batchInsertOriginalDataLog(entityList.subList(i, endIndex));
}
}
}
1,使用多线程的方式处理任务,
目前这里使用的是2个线程,处理不同的表数据
2,使用异步编码编写代码
// 初始化一个没有返回结果集的子任务
CompletableFuture<Void> insertOriginalDataLogFuture = CompletableFuture.runAsync(() -> {
// 相关的业务逻辑
...
}, threadCustomPoolExecutor);
3,数据采用批量处理方式,提高插入数据库的效率
4,优雅使用Stream流的方式,对数据进行处理,不使用原始方式比如for 循环的方式
测试效率每分钟可处理10万数据,进行落库。
喜欢我的文章的话,麻烦点个阅读或者点个点赞,是我编写博客的动力