ThreadPoolExecutor线程池

发布时间:2024年01月22日

一、什么是ThreadPoolExecutor线程池

ThreadPoolExecutor是Java中的一个线程池实现类,它提供了一种方便管理和复用线程的方式。通过使用ThreadPoolExecutor,可以更好地控制和管理多线程任务的执行。

ThreadPoolExecutor的构造函数有多个参数,其中一些重要的参数如下:

  1. corePoolSize:核心线程池大小,即线程池中保持存活的线程数。
  2. maximumPoolSize:最大线程池大小,即线程池中允许的最大线程数。
  3. keepAliveTime:线程空闲时间,即当线程池中的线程数量超过corePoolSize时,多余的空闲线程等待新任务的最长时间。
  4. TimeUnit:表示keepAliveTime的时间单位,例如TimeUnit.SECONDS表示秒。
  5. workQueue:用于保存等待执行的任务的阻塞队列。
  6. threadFactory:用于创建线程的工厂。
  7. rejectedExecutionHandler:拒绝策略,即当线程池无法处理新提交的任务时的处理方式。

一般而言,使用ThreadPoolExecutor的步骤如下:

  1. 创建一个ThreadPoolExecutor对象,并根据需求设置不同的参数。
  2. 提交任务给线程池进行执行,可以使用execute()方法或submit()方法提交任务。
  3. 线程池会自动管理线程的创建、复用和销毁,同时控制并发执行的线程数量和任务队列的长度。
  4. 当不再需要线程池时,可以调用shutdown()方法关闭线程池。

二、为什么需要线程池

为了提供程序的并行处理能力,引入多线程,但是线程本身的创建、调度等也需要耗时,如果一个线程创建好后使用完直接销毁掉,那么再需要用到线程处理任务的时候,又需要再次创建,显然这是耗时的且多余的,所以需要借用一种机制让线程驻留在内存中,下次使用的时候直接获取即可,这就是线程池机制

三、Java如何使用线程池?

JDK自带的线程池创建方法:

// 第一种线程池:固定个数的线程池,可以为每个CPU核绑定一定数量的线程数

1、ExecutorService fixedThreadPool = Executors.newFixedThreadPool(processors * 2);

// 缓存线程池,无上限

2、ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

// 单一线程池,永远会维护存在一条线程

3、ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); // 固定个数的线程池,可以执行延时任务,也可以执行带有返回值的任务。

4、ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

// 自定义线程池的全部参数

5、new ThreadPoolExecutor(int corePoolSize,//核心线程数

int maximumPoolSize,//最大线程数

long keepAliveTime,//线程空闲时间

TimeUnit unit,//时间单位

BlockingQueue<Runnable> workQueue,//任务队列

ThreadFactory threadFactory,//线程工厂

RejectedExecutionHandler handler//拒绝策略

推荐 new ThreadPoolExecutor()

corePoolSize:核心线程数,默认为1

设置规则: CPU密集型(计算密集型,指的是运算较多,cpu占用高,读/写I/O(硬盘/内存)较少):corePoolSize = CPU核数 + 1 IO密集型(与cpu密集型相反,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作):corePoolSize = CPU核数 * 2

maximumPoolSize:最大线程数

最大线程数,默认为Integer.MAX_VALUE,一般设置为和核心线程数一样

keepAliveTime:线程空闲时间

默认为60s,一般设置为默认60s

unit:时间单位,默认为秒

workQueue: 工作队列

队列,当线程数目超过核心线程数时用于保存任务的队列

常见的队列分3种:

无界对列:LinkedBlockingQueue

未处理的任务一直往队列里面的放,如果任务比较耗时容易导致OOM

Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue 所以不推荐使用

有界队列:ArrayBlockingQueue

遵循FIFO原则的有界队列,当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽

同步移交队列:

如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

threadFactory:线程工厂用来创建线程

为了统一在创建线程时设置一些参数,如是否守护线程,线程一些特性等,如优先级。通过这个TreadFactory创建出来的线程能保证有相同的特性

handler:拒绝策略

拒绝策略,默认是AbortPolicy,会抛出异常。 当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务。 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务。 AbortPolicy 丢弃任务,抛运行时异常。 CallerRunsPolicy 由当前调用的任务线程执行任务。 DiscardPolicy 忽视,什么都不会发生。 DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务。

创建ThreadPoolExecutor线程池

//获取cpu核心数
int i = Runtime.getRuntime().availableProcessors();
//核心线程数
int corePoolSize = i * 2;
//最大线程数
int maximumPoolSize = i * 2;
//线程无引用存活时间
long keepAliveTime = 60;
//时间单位
TimeUnit unit = TimeUnit.SECONDS;
//任务队列,接收一个整型的参数,这个整型参数指的是队列的长度,
/**
 *    【1】有界阻塞队列,先进先出,存取相互排斥
 *   【2】数据结构:静态数组(容量固定须指定长度,没有扩容机制,没有元素的位置也占用空间,被null占位)
 *   【3】ReentrantLock锁保证互斥性:存取都是同一把锁,操作的是同一个数组对象,存取相互排斥
 *   【4】阻塞对象(notEmpty【出队:队列count=0,无元素可取时,阻塞在该对象上】,notFull【入队:队列count=length,放不进元素时,阻塞在该对象上】)
 *   【5】入队,从队首开始添加元素,记录putIndex(到队尾时设置为0),唤醒notEmpty
 *   【6】出队,从队首开始添加元素,记录takeIndex(到队尾时设置为0),唤醒notFull
 *   【7】两个指针都是从队首向队尾移动,保证队列的先进先出原则(亮点:利用指针和数组,形成环状结构,重复利用内存空间)
 */
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10);
//线程工厂
//defaultThreadFactory()
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//拒绝执行处理器
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
//创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// 提交任务到线程池
threadPoolExecutor.execute(new Runnable() {
    @Override
    public void run() {
        for (int i=0;i<5;i++){
            System.out.println("线程1:"+i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
});

文章来源:https://blog.csdn.net/xwh041213/article/details/135748897
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。