线程池讲解

发布时间:2024年01月04日

一、为什么要使用线程池

在面向对象的编程中,对象的创建与销毁是比较消耗资源的,为了减少资源消耗,提高速度,产生了"池化技术"?

使用线程池的优点

1、提高响应速度,当任务到达时不需要等待创建就可以直接执行

2、减少资源消耗,重用存在的线程,减少创建

3、提高可管理性,可以提避免线程的乱创建,以及可以监控线程执行情况

二、怎么创建线程池

1、ThreadPoolExecutor() 是最原始的线程池创建,也是阿里巴巴 Java 开发手册中明确规范的创建线程池的方式。

2、通过?Executor?框架的工具类?Executors?来创建

三、Executors

1、内置线程池

(1)newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

(2)newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。如果希望在服务器上使用线程池,建议使用 newFixedThreadPool方法来创建线程池,这样能获得更好的性能。

(3) newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。

(4)newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

2、为什么不推荐使用内置线程池

1、FixedThreadPool 和 SingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。源码中核心线程数量和最大线程数量是一样的,因为队列是最大长度,所以最大线程数量参数是无效的,只会有核心线程数在运行,

2、CachedThreadPool:使用的是同步队列 SynchronousQueue, corePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程,允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。

3、ScheduledThreadPool 和 SingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM

DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

四、ThreadPoolxecutor

1、自定义线程池

@Configuration
public class AlgoFilterPool {

    @Value("${algo.filter.pool.corePoolSize:4}")
    private int corePoolSize;

    @Value("${algo.filter.pool.maxPoolSize:16}")
    private int maxPoolSize;

    @Value("${algo.filter.pool.keepAliveSeconds:3000}")
    private int keepAliveSeconds;

    @Value("${algo.filter.pool.queueCapacity:8}")
    private int queueCapacity;

    @Bean("algoFilterExecutor")
    public Executor alertExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(corePoolSize);
        //最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //队列容量
        executor.setQueueCapacity(queueCapacity);
        //活跃时间
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //线程名字前缀
        executor.setThreadNamePrefix("ALGO FILTER Thread-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }

}

?使用

    @Resource(name = "algoFilterExecutor")
    private ExecutorService executorService;


    public void handleSync(String code, List<Map> data,String targetField){
     /*   for (IAlgoFilter iAlgoFilter : list) {
            new Thread(() -> {
                iAlgoFilter.execute(code, data, targetField);
            }, iAlgoFilter.getType() + "-----计算").start();
        }*/

        for (IAlgoFilter iAlgoFilter : list) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    log.info("当前线程:{},执行特征编号:{}的:{} 计算", Thread.currentThread().getName(),code, iAlgoFilter.getType());
                    iAlgoFilter.execute(code, data, targetField);
                }
            });
        }


    }

2、构造函数重要参数分析

ThreadPoolExecutor3 个最重要的参数:

  • corePoolSize :任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize :任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue:当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit :keepAliveTime 参数的时间单位。
  3. threadFactory:为线程池提供创建新线程的线程工厂
  4. handler :线程池任务队列超过 maxinumPoolSize 之后的拒绝策略

3、饱和策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。

4、实现原理

  • 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
  • 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
  • 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。(并不是直接就启动最大线程数量进行执行,而是一个一个启动)
  • 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用RejectedExecutionHandler.rejectedExecution()方法

五、实践

多个业务场景公用一个线程池?

1.首先,项目中如果有多个场景需要使用线程池,那么最好的方式是:每一个业务场景使用独立的线程池。不要让所有的场景共用一个线程池。

分析:

1)独立的线城池之间互相不影响彼此的任务作业,更有利于保证本任务的独立性和完整性,更符合低耦合的设计思想

2)如果所有的场景共用一个线程池,可能会出现如下问题,举例:

比如有任务A、任务B、任务C? ?这三个任务场景共用一个线程池,配置如下

当任务A请求量剧烈增加的时候就会导致任务B和任务C,没有可用的线程? 可能出现迟迟获取不到资源的情况。比如任务A同时有3000个线程请求,此时就可能会导致? 任务B和任务C分配不到资源或者分配到很少的线程资源。

所以为了避免这种情况的产生,最好的方式是建立独立的线程池。

这样不管任务A的线程是堵塞还是其他原因,都不会影响任务B和任务C,同理任务B、任务C也一样不会影响其他场景任务的执行。

线程提交之后的返回类是什么?

1、execute(),submit() 区别

execute() : 没有返回值,只能执行Runnable。

submit() : 返回一个实现Feature接口的类,并且可以执行Runnable,和Callable类型的任务。通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException

线程池和 ThreadLocal 共用的坑

线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值。

不要以为代码中没有显示使用线程池就不存在线程池了,像常用的 Web 服务器 Tomcat 处理任务为了提高并发量,就使用到了线程池,并且使用的是基于原生 Java 线程池改进完善得到的自定义线程池。

当然了,你可以将 Tomcat 设置为单线程处理任务。不过,这并不合适,会严重影响其处理任务的速度。

解决上述问题比较建议的办法是使用阿里巴巴开源的 TransmittableThreadLocal(TTL)。TransmittableThreadLocal类继承并加强了 JDK 内置的InheritableThreadLocal类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

TransmittableThreadLocal 项目地址:https://github.com/alibaba/transmittable-thread-localopen in new window

文章来源:https://blog.csdn.net/cf18435109165/article/details/135368355
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。