Java 多线程执行任务

发布时间:2023年12月22日

需求: 通过多线程 调用第三方的接口,处理数据,并得到返回值:

main方法测试:

package auto.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestThread {
    static int poolSize = 10;
  
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		
		//程序计数器
		CountDownLatch countDownLatch = new CountDownLatch(poolSize);
		   
		//线程池 
		ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
		
		List<UserCamera> originList = new ArrayList(); //模拟生成 1000条数据
		for(int i=0; i <= 999; i++){
			UserCamera camera = new UserCamera();
			camera.setCameraCode(String.valueOf(i));
			camera.setCameraName("test Name "+String.valueOf(i) );
			camera.setAuthTime(new Date());
			originList.add(camera);
		}
		
		long ss = System.currentTimeMillis();
		//把数据 根据线程的个数  等分 一下
		List<List<UserCamera>> dataSets = new ArrayList();
		int batchSize =  (int)Math.ceil((double)originList.size()/poolSize);
		System.out.println("batchSize="+batchSize);
		for(int i =0; i<originList.size(); i+=batchSize){
			int start =i;
			int end = Math.min(start + batchSize, originList.size());
			List<UserCamera> dataSet = originList.subList(start, end);
			dataSets.add(dataSet);
		}
		CopyOnWriteArrayList<UserCamera> rr = new CopyOnWriteArrayList<>();  //返回的结果方法  rr 中, 此处用  CopyOnWriteArrayList
		//在向线程池ThreadPoolExecutor提交任务时,一般为了方便操作采用execute提交任务,这时线程其实是无返回值的,
		//但是在生产中为了应对各种各样的需求,获取线程返回值是必不可少的,所以SDK提供另一种任务提交方式submit,方法签名如下
		for(List<UserCamera> dataSet : dataSets){
			DataProcessingTask task = new DataProcessingTask(dataSet, countDownLatch, rr);
			executorService.execute(task);
		}
		
		countDownLatch.await();  //等待所有的线程执行结束
		for(int i =0; i<rr.size(); i++){
			UserCamera aac = rr.get(i);
			System.out.println("结果:"+aac.getSignData());
		}
		executorService.shutdown();
		System.out.println("######总耗时:" + (System.currentTimeMillis() - ss));
	}
}

实体类

package auto.thread;

import java.util.Date;

public class UserCamera {

	 /**
     * 摄像机编码
     */
   
   private String cameraCode;
   /**
     * 用户编码
     */
   
   private String userCode;
   /**
     * 
     */
   
   private Date authTime;
   /**
     * 名称
     */
   
   private String cameraName;
  


   /**
    * 结果
    */
   
   private String signData;


//省略get/set 方法
   
   
}

处理逻辑

package auto.thread;

import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;

public class DataProcessingTask implements Runnable {

	private List<UserCamera> dataSet;
	
	private CopyOnWriteArrayList<UserCamera> resultList;
	
	private CountDownLatch countDownLatch;
	
	public DataProcessingTask(List<UserCamera> dataSet, CountDownLatch countDownLatch, CopyOnWriteArrayList<UserCamera> resultList) {
		this.dataSet = dataSet;
		this.countDownLatch = countDownLatch;
		this.resultList = resultList;
	}


	@Override
	public void run() {
		try{
			for(int i =0; i < dataSet.size(); i++){
				//调用 数据处理逻辑
				UserCamera camrea = dataSet.get(i);
				String sign = sign(camrea.getCameraName());
				camrea.setSignData(sign);
				
			}
			resultList.addAll(dataSet);
		}catch(Exception e){
			System.out.println(e.getMessage());
		}finally {
			
			countDownLatch.countDown();
		}
		
		
	}

	
	/**
	 * 此方法是处理数据的逻辑,具体可以调用三方接口
	 * @param sign
	 * @return
	 */
	 private String sign(String sign){
		 //long ss = System.currentTimeMillis();
         try {
             Random  random = new Random();
             int i = 1+random.nextInt(30); 
             Thread.sleep(i); // 暂停1-100随机毫秒
             System.out.println(i);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         
          String ss = "--"+sign+"--测试--"+DateUtil.format(new Date(), DatePattern.PURE_DATETIME_MS_FORMAT);
          return ss;
	 }
}

具体的线程也可以 使用springboot线程池:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@Slf4j
public class ThreadPoolConfig {

    @Value("${thread.pool.config.core.size:0}")
    private int coreSize;

    @Value("${thread.pool.config.max.size:0}")
    private int maxSize;

    @Value("${thread.pool.config.queue.capacity:0}")
    private int queueCapacity;

    // 获取服务器的cpu个数
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数
    //private static final int COUR_SIZE = CPU_COUNT * 2;
    private static final int COUR_SIZE = CPU_COUNT;
    private static final int MAX_COUR_SIZE = CPU_COUNT * 4;

    // 接下来配置一个bean,配置线程池。
    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        if(coreSize == 0){
            coreSize = COUR_SIZE;
        }
        if(maxSize == 0){
            maxSize = MAX_COUR_SIZE;
        }
        if(queueCapacity == 0){
            queueCapacity = MAX_COUR_SIZE;
        }
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(coreSize);// 设置核心线程数
        threadPoolTaskExecutor.setMaxPoolSize(maxSize);// 配置最大线程数
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity * 4);// 配置队列容量(这里设置成最大线程数的四倍)
        threadPoolTaskExecutor.setThreadNamePrefix("sign-data-thread-");// 给线程池设置名称
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略
        log.info("######线程池配置:coreSize:{}, maxSize:{},queueCapacity:{}", coreSize,maxSize, queueCapacity*4);
        return threadPoolTaskExecutor;
    }
}

测试时可以改变线程的多少 测试运行速度

文章来源:https://blog.csdn.net/LZHH_2008/article/details/135159658
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。