ThreadPoolExecutor是Java中的一个线程池实现类,它提供了一种方便管理和复用线程的方式。通过使用ThreadPoolExecutor,可以更好地控制和管理多线程任务的执行。
ThreadPoolExecutor的构造函数有多个参数,其中一些重要的参数如下:
一般而言,使用ThreadPoolExecutor的步骤如下:
为了提供程序的并行处理能力,引入多线程,但是线程本身的创建、调度等也需要耗时,如果一个线程创建好后使用完直接销毁掉,那么再需要用到线程处理任务的时候,又需要再次创建,显然这是耗时的且多余的,所以需要借用一种机制让线程驻留在内存中,下次使用的时候直接获取即可,这就是线程池机制
JDK自带的线程池创建方法:
// 第一种线程池:固定个数的线程池,可以为每个CPU核绑定一定数量的线程数
1、ExecutorService fixedThreadPool = Executors.newFixedThreadPool(processors * 2);
// 缓存线程池,无上限
2、ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 单一线程池,永远会维护存在一条线程
3、ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); // 固定个数的线程池,可以执行延时任务,也可以执行带有返回值的任务。
4、ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 自定义线程池的全部参数
5、new ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程空闲时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//拒绝策略
设置规则: CPU密集型(计算密集型,指的是运算较多,cpu占用高,读/写I/O(硬盘/内存)较少):corePoolSize = CPU核数 + 1 IO密集型(与cpu密集型相反,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作):corePoolSize = CPU核数 * 2
最大线程数,默认为Integer.MAX_VALUE,一般设置为和核心线程数一样
默认为60s,一般设置为默认60s
队列,当线程数目超过核心线程数时用于保存任务的队列
常见的队列分3种:
未处理的任务一直往队列里面的放,如果任务比较耗时容易导致OOM
Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue 所以不推荐使用
遵循FIFO原则的有界队列,当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽
如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
为了统一在创建线程时设置一些参数,如是否守护线程,线程一些特性等,如优先级。通过这个TreadFactory创建出来的线程能保证有相同的特性
拒绝策略,默认是AbortPolicy,会抛出异常。 当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务。 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务。 AbortPolicy 丢弃任务,抛运行时异常。 CallerRunsPolicy 由当前调用的任务线程执行任务。 DiscardPolicy 忽视,什么都不会发生。 DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务。
//获取cpu核心数
int i = Runtime.getRuntime().availableProcessors();
//核心线程数
int corePoolSize = i * 2;
//最大线程数
int maximumPoolSize = i * 2;
//线程无引用存活时间
long keepAliveTime = 60;
//时间单位
TimeUnit unit = TimeUnit.SECONDS;
//任务队列,接收一个整型的参数,这个整型参数指的是队列的长度,
/**
* 【1】有界阻塞队列,先进先出,存取相互排斥
* 【2】数据结构:静态数组(容量固定须指定长度,没有扩容机制,没有元素的位置也占用空间,被null占位)
* 【3】ReentrantLock锁保证互斥性:存取都是同一把锁,操作的是同一个数组对象,存取相互排斥
* 【4】阻塞对象(notEmpty【出队:队列count=0,无元素可取时,阻塞在该对象上】,notFull【入队:队列count=length,放不进元素时,阻塞在该对象上】)
* 【5】入队,从队首开始添加元素,记录putIndex(到队尾时设置为0),唤醒notEmpty
* 【6】出队,从队首开始添加元素,记录takeIndex(到队尾时设置为0),唤醒notFull
* 【7】两个指针都是从队首向队尾移动,保证队列的先进先出原则(亮点:利用指针和数组,形成环状结构,重复利用内存空间)
*/
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10);
//线程工厂
//defaultThreadFactory()
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝执行处理器
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
//创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// 提交任务到线程池
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
for (int i=0;i<5;i++){
System.out.println("线程1:"+i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});