在并发编程中, 经常会遇到需要等待某些任务完成后再执行其他任务的情况。这就涉及到线程之间的协作和同步。
CountDownLatch 是 Java 并发包中的一个类, 它允许一个或多个线程等待其他线程完成操作, 再继续执行。
其灵活性和简洁性使得它成为并发编程中的不可或缺的工具。
CountDownLatch 的核心特点在于它是一种倒计数器, 初始值设定为某个正整数。
在多线程任务中, 当一个线程完成了一定的操作, 可通过调用 countDown() 方法来使计数器减 1, 而其他线程可通过 await() 方法等待计数器变为零。
一旦计数器变为零, 等待的线程将被唤醒继续执行。
为了能够理解 CountDownLatch, 举一个很通俗的例子。
运动员进行跑步比赛时, 假设有 6 名运动员参与比赛, 裁判员在终点会为这 6 名运动员分别计时, 可以想象每当一个运动员到达终点的时候,
对于裁判员来说就少了一个计时任务。直到所有运动员都到达终点了, 裁判员的任务也才完成。这 6 名运动员可以类比成 6 个线程, 当线程
调用 CountDownLatch.countDown 方法时就会对计数器的值减一, 直到计数器的值为 0 的时候, 裁判员 (调用 await 方法的线程) 才能继续往下执行。
下面通过代码的形式模拟一下上面的过程, 了解一下 CountDownLatch 的具体用法:
public class CountDownLatchDemo {
private static CountDownLatch startSignal = new CountDownLatch(1);
//用来表示裁判员需要维护的是 6 个运动员
private static CountDownLatch endSignal = new CountDownLatch(6);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 运动员等待裁判员响哨!!!");
// 这时候线程会被挂起
startSignal.await();
System.out.println(Thread.currentThread().getName() + "正在全力冲刺");
endSignal.countDown();
System.out.println(Thread.currentThread().getName() + " 到达终点");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("裁判员发号施令啦!!!");
// startSignal 里面的计数值 - 1, 本身维护的就是 1, 减 1 后, 为 0, 所有卡在 startSignal 的线程都会唤醒
// 即线程池中的 6 个线程都会唤醒
startSignal.countDown();
// 等待 endSignal 中的计数值 - 1
endSignal.await();
// endSignal 中的计数值变为 0, 继续执行了
System.out.println("所有运动员到达终点, 比赛结束!");
executorService.shutdown();
}
}
输出结果:
pool-1-thread-2 运动员等待裁判员响哨!!!
pool-1-thread-3 运动员等待裁判员响哨!!!
pool-1-thread-1 运动员等待裁判员响哨!!!
pool-1-thread-4 运动员等待裁判员响哨!!!
pool-1-thread-5 运动员等待裁判员响哨!!!
pool-1-thread-6 运动员等待裁判员响哨!!!
裁判员发号施令啦!!!
pool-1-thread-2正在全力冲刺
pool-1-thread-2 到达终点
pool-1-thread-3正在全力冲刺
pool-1-thread-3 到达终点
pool-1-thread-1正在全力冲刺
pool-1-thread-1 到达终点
pool-1-thread-4正在全力冲刺
pool-1-thread-4 到达终点
pool-1-thread-5正在全力冲刺
pool-1-thread-5 到达终点
pool-1-thread-6正在全力冲刺
pool-1-thread-6 到达终点
所有运动员到达终点, 比赛结束!
该示例代码中设置了两个 CountDownLatch, 第一个 endSignal 用于控制让 main 线程 (裁判员) 必须等到其他线程 (运动员) 让 CountDownLatch
维护的数值 counter 减到 0 为止。另一个 startSignal 用于让 main 线程对其他线程进行 “发号施令”, startSignal 引用的 CountDownLatch 初始值为 1,
而其他线程执行的 run 方法中都会先通过 startSignal.await() 让这些线程都被阻塞, 直到 main 线程通过调用 startSignal.countDown() 将
值 N 减 1, CountDownLatch 维护的数值 counter 为 0 后, 其他线程才能往下执行, 并且, 每个线程执行的 run 方法中都会通过 endSignal.countDown(),
对 endSignal 维护的数值进行减一, 由于往线程池提交了 6 个任务, 会被减 6 次, 所以 endSignal 维护的值最终会变为 0, 因此 main 线程在
latch.await() 阻塞结束, 才能继续往下执行。
另外, 需要注意的是, 当调用 CountDownLatch 的 countDown 方法时, 当前线程是不会被阻塞, 会继续往下执行, 比如在该例中会继续输出
pool-1-thread-4 到达终点。
先从 CountDownLatch 的构造方法看起:
public CountDownLatch(int count){
if (count < 0)
throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
构造方法会传入一个整数 count, 表示当前的计算器的起始值。
这个值会在 CountDownLatch 的 countDown 方法被调用时 - 1, 直到减到 0 为止, 就会唤醒调用 CountDownLatch await 方法的线程, 让他继续执行下去。
CountDownLatch 的方法不是很多, 这里列举几个比较常用的:
- await() throw InterruptedException: 调用该方法的线程被挂起, 直到构造方法传入的 count 减到 0 的时候, 才能继续往下执行
- await(long timeout, TimeUnit unit): 与上面的 await 方法功能一致, 只不过这里有了时间限制, 调用该方法的线程等到指定的 timeout 时间后, 不管 count 是否减至为 0, 都会继续往下执行
- countDown(): 使 CountDownLatch 计数值 count 减 1
- long getCount(): 获取当前 CountDownLatch 当前的计数值
CountDownLatch 内部是通过 AQS 的共享锁实现的, 所以只要理解了 CountDownLatch 的同步器, 基本就能了解大体的实现了。
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
// 这里的 count 就是声明 CountDownLatch 构造函数传入的计算器
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
// 状态值 == 0 的话, 获取共享锁成功, 否则失败
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// cas 交换状态值
for (;;) {
// 状态值 == 0, 是否锁失败
int c = getState();
if (c == 0)
return false;
// 否则 状态值减 1
int nextc = c - 1;
if (compareAndSetState(c, nextc))
// 减到状态值为 0 释放锁成功
return nextc == 0;
}
}
}
}
简简单单的一个同步器, 实现了共享锁的获取和释放的判断规则
public class CountDownLatch {
public void await() throws InterruptedException {
// 1. 先调用到 AbstractQueuedSynchronizer 的 acquireSharedInterruptibly
// 2. 在 AQS 的 acquireSharedInterruptibly 中先通过 CountDownLatch 自定义的 Sync 的 tryAcquireShared() 方法判断是否可以获取锁
// 在 tryAcquireShared 方法中只要状态值, 即设置的计数值不等于 0, 都是获取锁失败
// 3. 获取锁失败后, 会加入同步队列, 等待唤醒
sync.acquireSharedInterruptibly(1);
}
}
await(long timeout, TimeUnit unit) 方法这里就不展开了, 实质就是带超时时间的 await 方法,
将内部调用的 acquireSharedInterruptibly 方法换成了带超时时间的 acquireSharedInterruptibly 的方法就是了。
public class CountDownLatch {
public void countDown() {
// 1. 先调用到 AbstractQueuedSynchronizer 的 releaseShared
// 2. 在 AQS 的 releaseShared 中先通过 CountDownLatch 自定义的 Sync 的 tryReleaseShared() 方法判断是否可以获取锁
// 在 tryReleaseShared 方法中, 获取到当前的状态值, 即计数值, 等于 0, 直接释放锁失败
// 否则就给计数值 - 1, 减 1 后, 如果最新的计数值为 0 了, 释放锁就成功了, 同时唤醒阻塞在同步队列中的线程
sync.releaseShared(1);
}
}
总的来说 CountDownLatch 是一个很简单的类, 全部的实现都是通过 AbstractQueuedSynchronizer 的共享锁实现的。