👏作者简介:大家好,我是若明天不见,BAT的Java高级开发工程师,CSDN博客专家,后端领域优质创作者
📕系列专栏:多线程及高并发系列
📕其他专栏:微服务框架系列、MySQL系列、Redis系列、Leetcode算法系列、GraphQL系列
📜如果感觉博主的文章还不错的话,请👍点赞收藏关注👍支持一下博主哦??
?时间是条环形跑道,万物终将归零,亦得以圆全完美
多线程及高并发系列
在 Java 并发编程中,BlockingQueue
、Future
、FutureTask
和ThreadPoolExecutor
是相互关联的重要概念和组件
Future
的实现类,同时也是一个可执行的任务FutureTask
并通过BlockingQueue
进行交互BlockingQueue
是一个支持线程安全的、阻塞操作的队列,它的实现类都有这两个特性,在后文介绍时就不详细介绍了。常见的实现类有ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
等。BlockingQueue
在并发编程中广泛应用于实现生产者-消费者模式,其中生产者将数据放入队列,消费者从队列中取出数据进行处理
BlockingQueue 的阻塞操作(如 put() 和 take())可以确保生产者和消费者之间的同步,避免了线程之间的竞争条件
BlockingQueue
类型:
ArrayBlockingQueue
:由数组结构组成的有界阻塞队列LinkedBlockingQueue
:由链表结构组成的有界阻塞队列PriorityBlockingQueue
:支持优先级排序的无界阻塞队列DealyQueue
:使用优先级队列实现的无界阻塞队列SynchronousQueue
:不存储元素的阻塞队列LinkedTransferQueue
:由链表结构组成的无界阻塞队列LinkedBlockingDeque
:由链表结构组成的双向阻塞队列
ConcurrentLinkedQueue
是一个线程安全的无界队列实现,队列按照FIFO
原则对元素进行排序。使用 链表数据结构 和 CAS 操作来实现高并发的插入和提取操作
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 | 特定值 | 阻塞 | 超时 |
---|---|---|---|
插入 | add(o) | offer(o) | put(o) |
移除 | remove() | poll() | take() |
检查 | element() | peek() |
BlockingQueue
和BlockingDeque
是Java中用于多线程编程的接口,它们都提供了阻塞操作的功能,但在使用方式和特性上有一些异同。
相同点:
不同点:
数据结构差异:
BlockingQueue
是一种队列,它按照先进先出(FIFO)的顺序处理元素BlockingDeque
是一种双端队列,它允许在队列的两端进行插入和提取操作操作的位置差异:
BlockingQueue
的操作只涉及到队列的一端,即插入和提取操作只发生在队列的一端BlockingDeque
的操作可以在队列的两端进行,可以在队列的头部和尾部进行插入和提取操作BlockingDeque
为双端队列,因此可以根据使用方式模拟堆或栈的特性
BlockingDeque
模拟栈的使用示例:
public class BlockingDequeStackExample {
private BlockingDeque<Integer> stack;
public BlockingDequeStackExample() {
// 创建一个双端阻塞队列作为栈的实现
stack = new LinkedBlockingDeque<>();
}
public void push(int element) {
// 在队列的头部插入元素,模拟入栈操作。同addFirst方法
stack.push(element);
System.out.println("Pushed element: " + element);
}
public int pop() {
// 从队列的头部提取元素,模拟出栈操作。同removeFirst方法
int element = stack.pop();
System.out.println("Popped element: " + element);
return element;
}
public static void main(String[] args) {
BlockingDequeStackExample stackExample = new BlockingDequeStackExample();
// 模拟入栈和出栈操作
stackExample.push(1);
stackExample.push(2);
stackExample.push(3);
stackExample.pop();
stackExample.pop();
stackExample.pop();
}
}
ArrayBlockingQueue
和LinkedBlockingQueue
都是Java中的阻塞队列,一个是数组结构,一个是链表结构
异同点如下:
实现方式:
ArrayBlockingQueue
基于数组实现,内部使用ReentrantLock
来保证线程安全LinkedBlockingQueue
基于链表实现,内部使用两个锁(一个用于生产者,一个用于消费者)来保证线程安全长度限制:
ArrayBlockingQueue
在创建时需要指定一个固定的容量,即队列的长度是固定的,不能动态改变LinkedBlockingQueue
可以选择在创建时指定一个可选的固定容量,如果未指定,则默认为 Integer.MAX_VALUE,即队列长度可以无限扩展内存消耗:
ArrayBlockingQueue
使用数组作为底层数据结构,因此在创建时需要预分配固定大小的内存空间,即使队列中只有少量元素,也会占用整个数组的空间LinkedBlockingQueue
使用链表作为底层数据结构,内存空间按需分配,只会占用实际元素所需的内存空间公平性:
ArrayBlockingQueue
和LinkedBlockingQueue
都支持公平性设置。公平性表示线程是否按照它们加入队列的顺序来获取元素。当设置为公平模式时,线程将按照先进先出的顺序获取元素,但会对性能产生一定影响。默认情况下,ArrayBlockingQueue
和LinkedBlockingQueue
都是非公平,ArrayBlockingQueue
可以通过构造函数指定使用公平锁的ReentranLock
性能差异:
ArrayBlockingQueue
在高并发环境下的性能通常优于LinkedBlockingQueue
。这是因为ArrayBlockingQueue
使用单锁来保证线程安全,而LinkedBlockingQueue
使用两个锁,增加了一些额外的开销根据具体的使用场景和需求,可以选择适合的阻塞队列实现。最后根据场景,控制变量后分别压测,选择最合适的阻塞队列
PriorityBlockingQueue
是Java中的一个基于优先级的无界阻塞队列。它具有以下特性:
PriorityBlockingQueue
会根据元素的优先级进行排序。优先级高的元素在队列中排在前面。元素的优先级可以通过元素自身的比较器(Comparator
)或者元素自身的自然顺序来确定PriorityBlockingQueue
没有容量限制,可以根据需要动态地添加元素。它不会出现因队列已满而阻塞添加操作的情况import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueExample {
public static void main(String[] args) {
// 创建一个PriorityBlockingQueue实例
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
// 添加元素到队列中
queue.offer(5);
queue.offer(3);
queue.offer(1);
queue.offer(4);
queue.offer(2);
// 提取并打印队列中的元素 12345
while (!queue.isEmpty()) {
int element = queue.poll();
System.out.println("Polled element: " + element);
}
}
}
在实际应用中,PriorityBlockingQueue可用于实现任务调度、优先级队列等场景,其中需要按照优先级处理元素
DelayQueue是Java中的一个基于延迟时间的阻塞队列。它具有以下特性:
DelayQueue
中的元素必须实现Delayed
接口。Delayed
接口定义了一个getDelay(TimeUnit unit)
方法,用于获取元素的剩余延迟时间。只有当延迟时间小于等于零时,元素才可以从队列中提取DelayQueue
根据元素的延迟时间进行排序。延迟时间越短的元素在队列中排在前面DelayQueue
没有容量限制,可以根据需要动态地添加元素。它不会出现因队列已满而阻塞添加操作的情况在下述示例中,我们创建了一个DelayQueue
实例,并添加了一些延迟元素。延迟元素的延迟时间通过构造函数指定,并在getDelay
方法中计算剩余延迟时间
public class DelayQueueExample {
static class DelayedElement implements Delayed {
private String value;
private long endTime;
public DelayedElement(String value, long delayMs) {
this.value = value;
this.endTime = System.currentTimeMillis() + delayMs;
}
@Override
public long getDelay(TimeUnit unit) {
long remainingTime = endTime - System.currentTimeMillis();
return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
return Long.compare(diff, 0);
}
@Override
public String toString() {
return "DelayedElement{" +
"value='" + value + '\'' +
", endTime=" + endTime +
'}';
}
}
public static void main(String[] args) {
// 创建一个DelayQueue实例
DelayQueue<DelayedElement> queue = new DelayQueue<>();
// 添加延迟元素到队列中
queue.offer(new DelayedElement("Element 1", 2000));
queue.offer(new DelayedElement("Element 2", 5000));
queue.offer(new DelayedElement("Element 3", 3000));
// 提取并打印延迟元素
while (!queue.isEmpty()) {
try {
DelayedElement element = queue.take();
System.out.println("Polled element: " + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用take
方法从队列中提取延迟元素,并将其打印出来。由于DelayQueue
是一个阻塞队列,当队列为空时,提取操作会被阻塞,直到有元素的延迟时间到期
输出结果示例
Polled element: DelayedElement{value='Element 1', endTime=1641418006091}
Polled element: DelayedElement{value='Element 3', endTime=1641418009091}
Polled element: DelayedElement{value='Element 2', endTime=1641418013091}
在实际应用中,DelayQueue可用于实现定时任务、缓存过期等场景,其中需要根据延迟时间对元素进行排序和处理
Future
用于异步结果计算。它提供了一些方法来检查计算是否完成,使用get
方法将阻塞线程直到结果返回
cancel
:尝试取消任务的执行,如果任务已完成或已取消,此操作无效isCancelled
:任务是否已取消isDone
:任务是否已完成get
:阻塞线程以获取计算结果,直至任务执行完毕返回结果get(long timeout, TimeUnit unit)
:阻塞线程以获取计算结果,若在指定时间没返回结果,则返回nullpublic interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
结合线程池的使用
public void futureTest(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> nickFuture = executorService.submit(() -> userService.getNick());
Future<String> nameFuture = executorService.submit(() -> userService.getUserName());
// 阻塞开始,等待结果
String nick = nickFuture.get(1000, TimeUnit.MILLISECONDS);
String name = nameFuture.get();
}
FutureTask
是 Java 中的一个类,实现了Future
接口,同时也可以用作可执行的任务,特别适用于需要异步执行任务并获取结果的场景。FutureTask
常用来封装Callable
和Runnable
,将任务提交给线程池执行,并通过FutureTask
获取任务的执行结果。同时,FutureTask
也提供了一些方法来管理和控制任务的执行状态、取消任务的执行,并处理任务执行过程中的异常
FutureTask
是使用CAS操作来实现对任务状态的并发操作的。CAS机制保证了对任务状态的更新操作是原子性的,避免了竞态条件和数据不一致的问题
FutureTask
主要包括以下几种状态:
ThreadPoolExecutor 是 Java 中 Executor 框架提供的一个线程池实现类。它提供了一种方便的方式来管理和复用线程,并执行提交的任务。线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程
常用默认实现:
Executors#newCachedThreadPool
:无边界线程池,带有自动线程回收Executors#newFixedThreadPool
:固定大小的线程池Executors#newSingleThreadExecutor
:单个后台线程,大多数场景用于预初始化配置有需要执行的任务进入线程池时
handler
的策略)当ThreadPoolExecutor
执行execute
方法时,当前worker数小于corePoolSize
,会调用addWorker
方法,而workers.add(w)
是在ReentranLock全局锁里执行的,可能会导致以下问题:
ReentrantLock
的锁范围内执行workers.add(w)
操作,那么其他线程在尝试获取该锁时将被阻塞,直到当前线程释放锁。这可能会导致其他线程在等待期间出现延迟或阻塞ReentrantLock
的释放。这可能导致线程竞争和延迟,从而降低整体性能预热线程池是一种优化方法,可以在系统启动时提前创建一定数量的线程,以减少在系统运行时动态创建线程的开销
corePoolSize
核心线程数。空闲时仍会保留在池中的线程数,除非设置了allowCoreThreadTimeOut
参数
maximumPoolSize
最大线程数。允许在池中的最大线程数
keepAliveTime
存活时间。当前线程数大于核心线程数时,空余线程的最长存活时间
unit
单位。keepAliveTime
参数的时间单位
workQueue
工作队列,接口类为阻塞队列。任务执行前存储的队列,只有通过submit
方法提交的任务才会进入队列
threadFactory
线程工厂。创建线程。默认使用Executors.defaultThreadFactory()
,所有的线程都属于同一个ThreadGroup
,都有相同的优先级,且均不是守护线程。
(可用new NamedThreadFactory("test")
来对线程池中的线程添加前缀标识)
handler
任务丢弃策略。若线程池已经关闭、或线程池已满,那么新的任务会被拒绝。
ThreadPoolExecutor.AbortPolicy
:丢弃任务并抛出RejectedExecutionException
异常ThreadPoolExecutor.DiscardPolicy
:丢弃任务,但不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(循环此过程)ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务线程池必须手动通过ThreadPoolExecutor
的构造函数来声明,避免使用Executors
类创建线程池,否则会因为使用了无界队列或任务队列最大长度为 Integer.MAX_VALUE,导致堆积大量的请求 会有 OOM 风险
推荐使用有界队列,可以有效地控制线程池占用的内存和其他资源的数量,且maximumPoolSize
配置能排上用场
如果线程池的工作队列已满,但是线程池的线程数还没有达到maximumPoolSize
,那么线程池会创建新的非核心线程来处理这些任务,以避免任务积压和系统性能下降。
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用
可以利用ThreadPoolExecutor
的相关API
做一个基础的监控。从下图可以看出,ThreadPoolExecutor
提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等
也可以使用 SpringBoot 中的 Actuator 组件或 有监控功能的开源动态线程池Dynamic TP
通过ThreadPoolExecutor
提供的 public 方法可以动态修改参数配置
注意的是程序运行期间的时候,我们调用
setCorePoolSize()
这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize
,如果大于的话就会回收工作线程
更多动态修改线程池参数的功能,可以使用开源软件:
在@Async
注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor
线程池。
默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果
spring默认线程池配置参数
org.springframework.boot.autoconfigure.task.TaskExecutionProperties
/**
* Configuration properties for task execution.
*
* @author Stephane Nicoll
* @since 2.1.0
*/
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
private final Pool pool = new Pool();
/**
* Prefix to use for the names of newly created threads.
*/
private String threadNamePrefix = "task-";
public static class Pool {
/**
* Queue capacity. An unbounded capacity does not increase the pool and therefore
* ignores the "max-size" property.
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* Core number of threads.
*/
private int coreSize = 8;
/**
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
* can expand up to that size to accommodate the load. Ignored if the queue is
* unbounded.
*/
private int maxSize = Integer.MAX_VALUE;
/**
* Whether core threads are allowed to time out. This enables dynamic growing and
* shrinking of the pool.
*/
private boolean allowCoreThreadTimeout = true;
/**
* Time limit for which threads may remain idle before being terminated.
*/
private Duration keepAlive = Duration.ofSeconds(60);
//getter/setter
}
}
线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值
阿里开源的TransmittableThreadLocal(TTL)能解决线程池中ThreadLocal的问题。
TransmittableThreadLocal
类继承并加强了 JDK 内置的InheritableThreadLocal
类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题
参考资料: