需求: 通过多线程 调用第三方的接口,处理数据,并得到返回值:
main方法测试:
package auto.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestThread {
static int poolSize = 10;
public static void main(String[] args) throws InterruptedException, ExecutionException {
//程序计数器
CountDownLatch countDownLatch = new CountDownLatch(poolSize);
//线程池
ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
List<UserCamera> originList = new ArrayList(); //模拟生成 1000条数据
for(int i=0; i <= 999; i++){
UserCamera camera = new UserCamera();
camera.setCameraCode(String.valueOf(i));
camera.setCameraName("test Name "+String.valueOf(i) );
camera.setAuthTime(new Date());
originList.add(camera);
}
long ss = System.currentTimeMillis();
//把数据 根据线程的个数 等分 一下
List<List<UserCamera>> dataSets = new ArrayList();
int batchSize = (int)Math.ceil((double)originList.size()/poolSize);
System.out.println("batchSize="+batchSize);
for(int i =0; i<originList.size(); i+=batchSize){
int start =i;
int end = Math.min(start + batchSize, originList.size());
List<UserCamera> dataSet = originList.subList(start, end);
dataSets.add(dataSet);
}
CopyOnWriteArrayList<UserCamera> rr = new CopyOnWriteArrayList<>(); //返回的结果方法 rr 中, 此处用 CopyOnWriteArrayList
//在向线程池ThreadPoolExecutor提交任务时,一般为了方便操作采用execute提交任务,这时线程其实是无返回值的,
//但是在生产中为了应对各种各样的需求,获取线程返回值是必不可少的,所以SDK提供另一种任务提交方式submit,方法签名如下
for(List<UserCamera> dataSet : dataSets){
DataProcessingTask task = new DataProcessingTask(dataSet, countDownLatch, rr);
executorService.execute(task);
}
countDownLatch.await(); //等待所有的线程执行结束
for(int i =0; i<rr.size(); i++){
UserCamera aac = rr.get(i);
System.out.println("结果:"+aac.getSignData());
}
executorService.shutdown();
System.out.println("######总耗时:" + (System.currentTimeMillis() - ss));
}
}
实体类
package auto.thread;
import java.util.Date;
public class UserCamera {
/**
* 摄像机编码
*/
private String cameraCode;
/**
* 用户编码
*/
private String userCode;
/**
*
*/
private Date authTime;
/**
* 名称
*/
private String cameraName;
/**
* 结果
*/
private String signData;
//省略get/set 方法
}
处理逻辑
package auto.thread;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
public class DataProcessingTask implements Runnable {
private List<UserCamera> dataSet;
private CopyOnWriteArrayList<UserCamera> resultList;
private CountDownLatch countDownLatch;
public DataProcessingTask(List<UserCamera> dataSet, CountDownLatch countDownLatch, CopyOnWriteArrayList<UserCamera> resultList) {
this.dataSet = dataSet;
this.countDownLatch = countDownLatch;
this.resultList = resultList;
}
@Override
public void run() {
try{
for(int i =0; i < dataSet.size(); i++){
//调用 数据处理逻辑
UserCamera camrea = dataSet.get(i);
String sign = sign(camrea.getCameraName());
camrea.setSignData(sign);
}
resultList.addAll(dataSet);
}catch(Exception e){
System.out.println(e.getMessage());
}finally {
countDownLatch.countDown();
}
}
/**
* 此方法是处理数据的逻辑,具体可以调用三方接口
* @param sign
* @return
*/
private String sign(String sign){
//long ss = System.currentTimeMillis();
try {
Random random = new Random();
int i = 1+random.nextInt(30);
Thread.sleep(i); // 暂停1-100随机毫秒
System.out.println(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
String ss = "--"+sign+"--测试--"+DateUtil.format(new Date(), DatePattern.PURE_DATETIME_MS_FORMAT);
return ss;
}
}
具体的线程也可以 使用springboot线程池:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
public class ThreadPoolConfig {
@Value("${thread.pool.config.core.size:0}")
private int coreSize;
@Value("${thread.pool.config.max.size:0}")
private int maxSize;
@Value("${thread.pool.config.queue.capacity:0}")
private int queueCapacity;
// 获取服务器的cpu个数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数
//private static final int COUR_SIZE = CPU_COUNT * 2;
private static final int COUR_SIZE = CPU_COUNT;
private static final int MAX_COUR_SIZE = CPU_COUNT * 4;
// 接下来配置一个bean,配置线程池。
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
if(coreSize == 0){
coreSize = COUR_SIZE;
}
if(maxSize == 0){
maxSize = MAX_COUR_SIZE;
}
if(queueCapacity == 0){
queueCapacity = MAX_COUR_SIZE;
}
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(coreSize);// 设置核心线程数
threadPoolTaskExecutor.setMaxPoolSize(maxSize);// 配置最大线程数
threadPoolTaskExecutor.setQueueCapacity(queueCapacity * 4);// 配置队列容量(这里设置成最大线程数的四倍)
threadPoolTaskExecutor.setThreadNamePrefix("sign-data-thread-");// 给线程池设置名称
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略
log.info("######线程池配置:coreSize:{}, maxSize:{},queueCapacity:{}", coreSize,maxSize, queueCapacity*4);
return threadPoolTaskExecutor;
}
}
测试时可以改变线程的多少 测试运行速度