【多线程及高并发 六】并发集合及线程池详解

发布时间:2024年01月15日

👏作者简介:大家好,我是若明天不见,BAT的Java高级开发工程师,CSDN博客专家,后端领域优质创作者
📕系列专栏:多线程及高并发系列
📕其他专栏:微服务框架系列MySQL系列Redis系列Leetcode算法系列GraphQL系列
📜如果感觉博主的文章还不错的话,请👍点赞收藏关注👍支持一下博主哦??
?时间是条环形跑道,万物终将归零,亦得以圆全完美


多线程及高并发系列


在 Java 并发编程中,BlockingQueueFutureFutureTaskThreadPoolExecutor是相互关联的重要概念和组件

  • BlockingQueue:是一个支持线程安全的、阻塞操作的队列。提供了线程间的数据传递机制
  • Future:是一个接口,表示一个异步计算的结果。提供了异步任务的结果获取机制
  • FutureTask: 是Future的实现类,同时也是一个可执行的任务
  • ThreadPoolExecutor:线程池是一个线程管理的工具,用于管理和复用线程资源。管理和调度任务的执行,将任务封装成FutureTask并通过BlockingQueue进行交互

BlockingQueue

并发集合

BlockingQueue是一个支持线程安全的、阻塞操作的队列,它的实现类都有这两个特性,在后文介绍时就不详细介绍了。常见的实现类有ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue等。BlockingQueue在并发编程中广泛应用于实现生产者-消费者模式,其中生产者将数据放入队列,消费者从队列中取出数据进行处理

BlockingQueue 的阻塞操作(如 put() 和 take())可以确保生产者和消费者之间的同步,避免了线程之间的竞争条件

BlockingQueue类型:

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
  • DealyQueue:使用优先级队列实现的无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

ConcurrentLinkedQueue是一个线程安全的无界队列实现,队列按照FIFO原则对元素进行排序。使用 链表数据结构CAS 操作来实现高并发的插入和提取操作

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常特定值阻塞超时
插入add(o)offer(o)put(o)
移除remove()poll()take()
检查element()peek()

BlockingQueue & BlockingDeque 对比

BlockingQueueBlockingDeque是Java中用于多线程编程的接口,它们都提供了阻塞操作的功能,但在使用方式和特性上有一些异同。

相同点:

  • 都是用于在多线程环境下进行安全的数据交换的接口
  • 都提供了阻塞操作,即在队列为空时,获取元素的操作会被阻塞,直到队列中有元素可用;在队列已满时,插入元素的操作会被阻塞,直到队列有空闲位置

不同点:

  1. 数据结构差异:

    • BlockingQueue是一种队列,它按照先进先出(FIFO)的顺序处理元素
    • BlockingDeque是一种双端队列,它允许在队列的两端进行插入和提取操作
  2. 操作的位置差异:

    • BlockingQueue的操作只涉及到队列的一端,即插入和提取操作只发生在队列的一端
    • BlockingDeque的操作可以在队列的两端进行,可以在队列的头部和尾部进行插入和提取操作

BlockingDeque为双端队列,因此可以根据使用方式模拟堆或栈的特性

BlockingDeque模拟栈的使用示例:

public class BlockingDequeStackExample {
    private BlockingDeque<Integer> stack;

    public BlockingDequeStackExample() {
        // 创建一个双端阻塞队列作为栈的实现
        stack = new LinkedBlockingDeque<>();
    }

    public void push(int element) {
        // 在队列的头部插入元素,模拟入栈操作。同addFirst方法
        stack.push(element);
        System.out.println("Pushed element: " + element);
    }

    public int pop() {
        // 从队列的头部提取元素,模拟出栈操作。同removeFirst方法
        int element = stack.pop();
        System.out.println("Popped element: " + element);
        return element;
    }

    public static void main(String[] args) {
        BlockingDequeStackExample stackExample = new BlockingDequeStackExample();

        // 模拟入栈和出栈操作
        stackExample.push(1);
        stackExample.push(2);
        stackExample.push(3);

        stackExample.pop();
        stackExample.pop();
        stackExample.pop();
    }
}

ArrayBlockingQueue & LinkedBlockingQueue 对比

ArrayBlockingQueueLinkedBlockingQueue都是Java中的阻塞队列,一个是数组结构,一个是链表结构

异同点如下:

  1. 实现方式:

    • ArrayBlockingQueue基于数组实现,内部使用ReentrantLock来保证线程安全
    • LinkedBlockingQueue基于链表实现,内部使用两个锁(一个用于生产者,一个用于消费者)来保证线程安全
  2. 长度限制:

    • ArrayBlockingQueue在创建时需要指定一个固定的容量,即队列的长度是固定的,不能动态改变
    • LinkedBlockingQueue可以选择在创建时指定一个可选的固定容量,如果未指定,则默认为 Integer.MAX_VALUE,即队列长度可以无限扩展
  3. 内存消耗:

    • ArrayBlockingQueue使用数组作为底层数据结构,因此在创建时需要预分配固定大小的内存空间,即使队列中只有少量元素,也会占用整个数组的空间
    • LinkedBlockingQueue使用链表作为底层数据结构,内存空间按需分配,只会占用实际元素所需的内存空间
  4. 公平性:

    • ArrayBlockingQueueLinkedBlockingQueue都支持公平性设置。公平性表示线程是否按照它们加入队列的顺序来获取元素。当设置为公平模式时,线程将按照先进先出的顺序获取元素,但会对性能产生一定影响。默认情况下,ArrayBlockingQueueLinkedBlockingQueue都是非公平
    • ArrayBlockingQueue可以通过构造函数指定使用公平锁的ReentranLock
  5. 性能差异:

    • 由于内部实现方式不同,ArrayBlockingQueue在高并发环境下的性能通常优于LinkedBlockingQueue。这是因为ArrayBlockingQueue使用单锁来保证线程安全,而LinkedBlockingQueue使用两个锁,增加了一些额外的开销

根据具体的使用场景和需求,可以选择适合的阻塞队列实现。最后根据场景,控制变量后分别压测,选择最合适的阻塞队列

PriorityBlockingQueue

PriorityBlockingQueue是Java中的一个基于优先级的无界阻塞队列。它具有以下特性:

  1. 按优先级排序PriorityBlockingQueue会根据元素的优先级进行排序。优先级高的元素在队列中排在前面。元素的优先级可以通过元素自身的比较器(Comparator)或者元素自身的自然顺序来确定
  2. 无界队列PriorityBlockingQueue没有容量限制,可以根据需要动态地添加元素。它不会出现因队列已满而阻塞添加操作的情况
  3. 线程安全
  4. 阻塞操作
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个PriorityBlockingQueue实例
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

        // 添加元素到队列中
        queue.offer(5);
        queue.offer(3);
        queue.offer(1);
        queue.offer(4);
        queue.offer(2);

        // 提取并打印队列中的元素 12345
        while (!queue.isEmpty()) {
            int element = queue.poll();
            System.out.println("Polled element: " + element);
        }
    }
}

在实际应用中,PriorityBlockingQueue可用于实现任务调度优先级队列等场景,其中需要按照优先级处理元素

DelayQueue

DelayQueue是Java中的一个基于延迟时间的阻塞队列。它具有以下特性:

  1. 延迟处理DelayQueue中的元素必须实现Delayed接口。Delayed接口定义了一个getDelay(TimeUnit unit)方法,用于获取元素的剩余延迟时间。只有当延迟时间小于等于零时,元素才可以从队列中提取
  2. 按延迟时间排序DelayQueue根据元素的延迟时间进行排序。延迟时间越短的元素在队列中排在前面
  3. 无界队列DelayQueue没有容量限制,可以根据需要动态地添加元素。它不会出现因队列已满而阻塞添加操作的情况
  4. 线程安全
  5. 阻塞操作

在下述示例中,我们创建了一个DelayQueue实例,并添加了一些延迟元素。延迟元素的延迟时间通过构造函数指定,并在getDelay方法中计算剩余延迟时间

public class DelayQueueExample {
    static class DelayedElement implements Delayed {
        private String value;
        private long endTime;

        public DelayedElement(String value, long delayMs) {
            this.value = value;
            this.endTime = System.currentTimeMillis() + delayMs;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long remainingTime = endTime - System.currentTimeMillis();
            return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed other) {
            long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
            return Long.compare(diff, 0);
        }

        @Override
        public String toString() {
            return "DelayedElement{" +
                    "value='" + value + '\'' +
                    ", endTime=" + endTime +
                    '}';
        }
    }

    public static void main(String[] args) {
        // 创建一个DelayQueue实例
        DelayQueue<DelayedElement> queue = new DelayQueue<>();

        // 添加延迟元素到队列中
        queue.offer(new DelayedElement("Element 1", 2000));
        queue.offer(new DelayedElement("Element 2", 5000));
        queue.offer(new DelayedElement("Element 3", 3000));

        // 提取并打印延迟元素
        while (!queue.isEmpty()) {
            try {
                DelayedElement element = queue.take();
                System.out.println("Polled element: " + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

使用take方法从队列中提取延迟元素,并将其打印出来。由于DelayQueue是一个阻塞队列,当队列为空时,提取操作会被阻塞,直到有元素的延迟时间到期

输出结果示例

Polled element: DelayedElement{value='Element 1', endTime=1641418006091}
Polled element: DelayedElement{value='Element 3', endTime=1641418009091}
Polled element: DelayedElement{value='Element 2', endTime=1641418013091}

在实际应用中,DelayQueue可用于实现定时任务缓存过期等场景,其中需要根据延迟时间对元素进行排序和处理

Future

Future用于异步结果计算。它提供了一些方法来检查计算是否完成,使用get方法将阻塞线程直到结果返回

  • cancel:尝试取消任务的执行,如果任务已完成或已取消,此操作无效
  • isCancelled:任务是否已取消
  • isDone:任务是否已完成
  • get:阻塞线程以获取计算结果,直至任务执行完毕返回结果
  • get(long timeout, TimeUnit unit):阻塞线程以获取计算结果,若在指定时间没返回结果,则返回null
public  interface  Future<V> {
    boolean cancel(boolean  mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future结合线程池的使用

public void futureTest(){
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> nickFuture = executorService.submit(() -> userService.getNick());
    Future<String> nameFuture = executorService.submit(() -> userService.getUserName());

    // 阻塞开始,等待结果
    String nick = nickFuture.get(1000, TimeUnit.MILLISECONDS);
    String name = nameFuture.get();
}

FutureTask

FutureTask是 Java 中的一个类,实现了Future接口,同时也可以用作可执行的任务,特别适用于需要异步执行任务并获取结果的场景。FutureTask常用来封装CallableRunnable,将任务提交给线程池执行,并通过FutureTask获取任务的执行结果。同时,FutureTask也提供了一些方法来管理和控制任务的执行状态、取消任务的执行,并处理任务执行过程中的异常

FutureTask是使用CAS操作来实现对任务状态的并发操作的。CAS机制保证了对任务状态的更新操作是原子性的,避免了竞态条件和数据不一致的问题

FutureTask主要包括以下几种状态:

  1. NEW:任务的初始状态,表示任务尚未执行
  2. COMPLETING:表示任务正在执行完成的过程中,但结果尚未设置完毕
  3. NORMAL:任务执行成功完成
  4. EXCEPTIONAL:任务执行过程中发生了异常
  5. CANCELLED:任务被取消
  6. INTERRUPTING:任务正在被中断的过程中
  7. INTERRUPTED:任务被中断

ThreadPoolExecutor 线程池

ThreadPoolExecutor 是 Java 中 Executor 框架提供的一个线程池实现类。它提供了一种方便的方式来管理和复用线程,并执行提交的任务。线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程

ThreadPoolExecutor运行流程

常用默认实现:

  1. Executors#newCachedThreadPool:无边界线程池,带有自动线程回收
  2. Executors#newFixedThreadPool:固定大小的线程池
  3. Executors#newSingleThreadExecutor:单个后台线程,大多数场景用于预初始化配置

有需要执行的任务进入线程池时

  • 当前线程数小于核心线程数时,创建线程。
  • 当前线程数大于等于核心线程数,且工作队列未满时,将任务放入工作队列。
  • 当前线程数大于等于核心线程数,且工作队列已满
    • 若线程数小于最大线程数,创建线程
    • 若线程数等于最大线程数,抛出异常,拒绝任务(具体处理方式取决于handler的策略)

任务调度流程

ThreadPoolExecutor执行execute方法时,当前worker数小于corePoolSize,会调用addWorker方法,而workers.add(w)是在ReentranLock全局锁里执行的,可能会导致以下问题:

  1. 阻塞其他线程:在ReentrantLock的锁范围内执行workers.add(w)操作,那么其他线程在尝试获取该锁时将被阻塞,直到当前线程释放锁。这可能会导致其他线程在等待期间出现延迟或阻塞
  2. 性能下降:它们将按顺序等待ReentrantLock的释放。这可能导致线程竞争和延迟,从而降低整体性能

预热线程池是一种优化方法,可以在系统启动时提前创建一定数量的线程,以减少在系统运行时动态创建线程的开销

配置参数

  • corePoolSize
    核心线程数。空闲时仍会保留在池中的线程数,除非设置了allowCoreThreadTimeOut参数

  • maximumPoolSize
    最大线程数。允许在池中的最大线程数

  • keepAliveTime
    存活时间。当前线程数大于核心线程数时,空余线程的最长存活时间

  • unit
    单位。keepAliveTime参数的时间单位

  • workQueue
    工作队列,接口类为阻塞队列。任务执行前存储的队列,只有通过submit方法提交的任务才会进入队列

  • threadFactory
    线程工厂。创建线程。默认使用Executors.defaultThreadFactory(),所有的线程都属于同一个ThreadGroup,都有相同的优先级,且均不是守护线程。
    (可用new NamedThreadFactory("test")来对线程池中的线程添加前缀标识)

  • handler
    任务丢弃策略。若线程池已经关闭、或线程池已满,那么新的任务会被拒绝。

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
    • ThreadPoolExecutor.DiscardPolicy:丢弃任务,但不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(循环此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

合理配置线程池

  1. 线程池必须手动通过ThreadPoolExecutor的构造函数来声明,避免使用Executors类创建线程池,否则会因为使用了无界队列任务队列最大长度为 Integer.MAX_VALUE,导致堆积大量的请求 会有 OOM 风险

  2. 推荐使用有界队列,可以有效地控制线程池占用的内存和其他资源的数量,且maximumPoolSize配置能排上用场

如果线程池的工作队列已满,但是线程池的线程数还没有达到maximumPoolSize,那么线程池会创建新的非核心线程来处理这些任务,以避免任务积压和系统性能下降。

  1. corePoolSize 配置
  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用

线程池监控

可以利用ThreadPoolExecutor的相关API做一个基础的监控。从下图可以看出,ThreadPoolExecutor提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等

也可以使用 SpringBoot 中的 Actuator 组件或 有监控功能的开源动态线程池Dynamic TP

动态化线程池

通过ThreadPoolExecutor提供的 public 方法可以动态修改参数配置

注意的是程序运行期间的时候,我们调用setCorePoolSize()这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize,如果大于的话就会回收工作线程

更多动态修改线程池参数的功能,可以使用开源软件:

  • Hippo4jopen:异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入。支持多种使用模式,轻松引入,致力于提高系统运行保障能力
  • Dynamic TP:轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通过 SPI 自定义实现)

@Async 自定义线程池

@Async注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor线程池。

默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果

spring默认线程池配置参数org.springframework.boot.autoconfigure.task.TaskExecutionProperties

/**
 * Configuration properties for task execution.
 *
 * @author Stephane Nicoll
 * @since 2.1.0
 */
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

	private final Pool pool = new Pool();

	/**
	 * Prefix to use for the names of newly created threads.
	 */
	private String threadNamePrefix = "task-";

	public static class Pool {

		/**
		 * Queue capacity. An unbounded capacity does not increase the pool and therefore
		 * ignores the "max-size" property.
		 */
		private int queueCapacity = Integer.MAX_VALUE;

		/**
		 * Core number of threads.
		 */
		private int coreSize = 8;

		/**
		 * Maximum allowed number of threads. If tasks are filling up the queue, the pool
		 * can expand up to that size to accommodate the load. Ignored if the queue is
		 * unbounded.
		 */
		private int maxSize = Integer.MAX_VALUE;

		/**
		 * Whether core threads are allowed to time out. This enables dynamic growing and
		 * shrinking of the pool.
		 */
		private boolean allowCoreThreadTimeout = true;

		/**
		 * Time limit for which threads may remain idle before being terminated.
		 */
		private Duration keepAlive = Duration.ofSeconds(60);
		
		//getter/setter
		}
}

线程池和 ThreadLocal 共用的坑

线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值

阿里开源的TransmittableThreadLocal(TTL)能解决线程池中ThreadLocal的问题。

TransmittableThreadLocal类继承并加强了 JDK 内置的InheritableThreadLocal类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题


参考资料:

  1. Java线程池实现原理及其在美团业务中的实践
  2. Java 线程池最佳实践
  3. Java 线程池作用及类型
  4. Java 并发编程 Future及CompletionService
  5. TransmittableThreadLocal(TTL)
  6. 案例分析|线程池相关故障梳理&总结
文章来源:https://blog.csdn.net/why_still_confused/article/details/135425210
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。