在面向对象的编程中,对象的创建与销毁是比较消耗资源的,为了减少资源消耗,提高速度,产生了"池化技术"?
使用线程池的优点
1、提高响应速度,当任务到达时不需要等待创建就可以直接执行
2、减少资源消耗,重用存在的线程,减少创建
3、提高可管理性,可以提避免线程的乱创建,以及可以监控线程执行情况
1、ThreadPoolExecutor() 是最原始的线程池创建,也是阿里巴巴 Java 开发手册中明确规范的创建线程池的方式。
2、通过?Executor?框架的工具类?Executors?来创建
(1)newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
(2)newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。如果希望在服务器上使用线程池,建议使用 newFixedThreadPool方法来创建线程池,这样能获得更好的性能。
(3) newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。
(4)newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
1、FixedThreadPool 和 SingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。源码中核心线程数量和最大线程数量是一样的,因为队列是最大长度,所以最大线程数量参数是无效的,只会有核心线程数在运行,
2、CachedThreadPool:使用的是同步队列 SynchronousQueue, corePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程,允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。
3、ScheduledThreadPool 和 SingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM
DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。
@Configuration
public class AlgoFilterPool {
@Value("${algo.filter.pool.corePoolSize:4}")
private int corePoolSize;
@Value("${algo.filter.pool.maxPoolSize:16}")
private int maxPoolSize;
@Value("${algo.filter.pool.keepAliveSeconds:3000}")
private int keepAliveSeconds;
@Value("${algo.filter.pool.queueCapacity:8}")
private int queueCapacity;
@Bean("algoFilterExecutor")
public Executor alertExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(corePoolSize);
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//队列容量
executor.setQueueCapacity(queueCapacity);
//活跃时间
executor.setKeepAliveSeconds(keepAliveSeconds);
//线程名字前缀
executor.setThreadNamePrefix("ALGO FILTER Thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor.getThreadPoolExecutor();
}
}
?使用
@Resource(name = "algoFilterExecutor")
private ExecutorService executorService;
public void handleSync(String code, List<Map> data,String targetField){
/* for (IAlgoFilter iAlgoFilter : list) {
new Thread(() -> {
iAlgoFilter.execute(code, data, targetField);
}, iAlgoFilter.getType() + "-----计算").start();
}*/
for (IAlgoFilter iAlgoFilter : list) {
executorService.submit(new Runnable() {
@Override
public void run() {
log.info("当前线程:{},执行特征编号:{}的:{} 计算", Thread.currentThread().getName(),code, iAlgoFilter.getType());
iAlgoFilter.execute(code, data, targetField);
}
});
}
}
ThreadPoolExecutor3 个最重要的参数:
ThreadPoolExecutor其他常见参数:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:
举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。
1.首先,项目中如果有多个场景需要使用线程池,那么最好的方式是:每一个业务场景使用独立的线程池。不要让所有的场景共用一个线程池。
分析:
1)独立的线城池之间互相不影响彼此的任务作业,更有利于保证本任务的独立性和完整性,更符合低耦合的设计思想
2)如果所有的场景共用一个线程池,可能会出现如下问题,举例:
比如有任务A、任务B、任务C? ?这三个任务场景共用一个线程池,配置如下
当任务A请求量剧烈增加的时候就会导致任务B和任务C,没有可用的线程? 可能出现迟迟获取不到资源的情况。比如任务A同时有3000个线程请求,此时就可能会导致? 任务B和任务C分配不到资源或者分配到很少的线程资源。
所以为了避免这种情况的产生,最好的方式是建立独立的线程池。
这样不管任务A的线程是堵塞还是其他原因,都不会影响任务B和任务C,同理任务B、任务C也一样不会影响其他场景任务的执行。
1、execute(),submit() 区别
execute() : 没有返回值,只能执行Runnable。
submit() : 返回一个实现Feature接口的类,并且可以执行Runnable,和Callable类型的任务。通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException
线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值。
不要以为代码中没有显示使用线程池就不存在线程池了,像常用的 Web 服务器 Tomcat 处理任务为了提高并发量,就使用到了线程池,并且使用的是基于原生 Java 线程池改进完善得到的自定义线程池。
当然了,你可以将 Tomcat 设置为单线程处理任务。不过,这并不合适,会严重影响其处理任务的速度。
解决上述问题比较建议的办法是使用阿里巴巴开源的 TransmittableThreadLocal(TTL)。TransmittableThreadLocal类继承并加强了 JDK 内置的InheritableThreadLocal类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。
TransmittableThreadLocal 项目地址:https://github.com/alibaba/transmittable-thread-localopen in new window 。