CountDownLatch 是 Java 多线程并发包 (java.util.concurrent 包) 中的一种同步工具,用于协调多个线程之间的执行。它的主要作用是允许一个或多个线程等待其他线程完成操作。
主要特点和用途:
倒计数器: CountDownLatch 是一种倒计数的同步工具。在初始化时,它接受一个正整数作为计数值。计数值表示需要等待完成的线程数量。
等待阻塞: 当一个线程调用 await 方法时,它会阻塞,直到计数值减到零。当计数值不为零时,调用 await 的线程会等待。
计数递减: 其他线程完成任务时,可以通过调用 countDown 方法来递减计数值。一旦计数值减到零,所有等待的线程都将被释放。
一次性使用: 一旦计数值变为零,CountDownLatch 就不能再次使用。如果需要多次等待,可以考虑使用 CyclicBarrier。
基本用法:
CountDownLatch的主要构造方法如下:
CountDownLatch(int count)
count 用于初始化计数值,指定需要等待的线程数量。
分阶段任务: 适用于多个线程分阶段执行任务,每个阶段的任务需要等待所有线程完成后才能继续。
数据计算: 适用于将数据拆分给多个线程进行计算,最后合并计算结果。
可循环使用: 支持循环使用,一旦所有线程到达屏障,可以进行下一轮的等待。
动态添加: 允许在屏障等待的过程中动态添加等待线程。
并行任务: 适用于一组线程需要在主线程等待它们全部完成后才能继续执行的情况。
资源准备: 适用于主线程等待多个子线程完成资源准备工作后再开始工作。
不可重复使用: 一旦计数值变为零,就不能再次使用,需要重新创建。
静态计数: 初始化时就要指定计数值,不能在运行时动态改变计数值。
阶段性等待: 如果任务有多个阶段,每个阶段需要等待所有线程完成后再进行下一阶段,可以选择 CyclicBarrier。
一次性等待: 如果任务分为多个子任务,主线程需要等待所有子任务完成后才能继续执行,可以选择 CountDownLatch。
总的来说,CyclicBarrier 更适合处理多个线程相互等待的场景,而 CountDownLatch 更适合一组线程等待另一组线程完成的场景。
// 内部使用的同步器
private final Sync sync;
/**
* 等待计数值减到零。
*
* @throws InterruptedException 如果当前线程在等待时被中断
*/
public void await () throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 等待计数值减到零,支持设置最大等待时间。
*
* @param timeout 最大等待时间
* @param unit 时间单位
* @return 如果在指定时间内计数值减到零,返回 true;否则返回 false
* @throws InterruptedException 如果当前线程在等待时被中断
*/
public boolean await ( long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 递减计数值,表示一个线程已经完成任务。
*/
public void countDown () {
sync.releaseShared(1);
}
/**
* 尝试释放共享资源。
* 如果 tryReleaseShared 返回 true,表示成功释放资源,随后调用 doReleaseShared 来执行释放后的操作,
* 包括唤醒等待的线程。如果 tryReleaseShared 返回 false,表示释放失败,不执行后续操作。
*
* @param arg 释放资源的参数
* @return 如果释放成功,返回 true;否则返回 false
*/
public final boolean releaseShared ( int arg){
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/**
* 尝试释放共享资源。
* 确保释放的操作能够传播,即使有其他正在进行的获取/释放操作。方法尝试通过尝试 unparkSuccessor 来唤醒等待的线程,
* 如果 unparkSuccessor 失败,则将等待状态设置为 PROPAGATE 以确保在释放时继续传播。此外,必须在进行此操作时循环,
* 以防在此期间添加了新节点。与 unparkSuccessor 的其他用法不同,我们需要知道如果 CAS 重置状态失败,如果失败则重新检查。
*/
private void doReleaseShared () {
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// 内部同步器
private static final class Sync extends AbstractQueuedSynchronizer {
// 构造方法,初始化计数值
Sync(int count) {
setState(count);
}
// 尝试递减计数值
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试释放共享资源
@Override
protected boolean tryReleaseShared(int releases) {
// 递减计数值
for (; ; ) {
int current = getState();
if (current == 0) {
return false;
}
int next = current - 1;
if (compareAndSetState(current, next)) {
// 如果计数值减到零,唤醒等待的线程
return next == 0;
}
}
}
}
在一个任务分为多个子任务的情况下,主任务可以使用 CountDownLatch 来等待所有的子任务都完成后再继续执行。
CountDownLatch latch = new CountDownLatch(numTasks);
// 主任务
for (int i = 0; i < numTasks; i++) {
new Thread(() -> {
// 执行子任务
// ...
latch.countDown(); // 子任务完成,计数减一
}).start();
}
try {
latch.await(); // 主任务等待所有子任务完成
} catch (InterruptedException e) {
// 处理中断异常
}
在某些情况下,多个线程需要在开始工作之前先完成一些初始化工作,主线程可以使用 CountDownLatch 来等待所有线程初始化完成。
CountDownLatch initializationLatch = new CountDownLatch(numThreads);
// 主线程等待多个线程初始化完成
for (int i = 0; i < numThreads; i++) {
new Thread(() -> {
// 执行初始化工作
// ...
initializationLatch.countDown(); // 初始化完成,计数减一
}).start();
}
try {
initializationLatch.await(); // 主线程等待所有线程初始化完成
} catch (InterruptedException e) {
// 处理中断异常
}
在某些并发场景中,可能需要等待一组线程都准备好才能同时开始执行任务。
CountDownLatch startLatch = new CountDownLatch(1); // 主线程控制开始
CountDownLatch endLatch = new CountDownLatch(numThreads); // 主线程等待所有线程完成
// 多个线程等待开始信号
for (int i = 0; i < numThreads; i++) {
new Thread(() -> {
try {
startLatch.await(); // 等待开始信号
// 执行任务
// ...
} catch (InterruptedException e) {
// 处理中断异常
} finally {
endLatch.countDown(); // 任务完成,计数减一
}
}).start();
}
// 主线程发出开始信号
startLatch.countDown();
try {
endLatch.await(); // 主线程等待所有线程完成
} catch (InterruptedException e) {
// 处理中断异常
}
假设我们有一个模拟赛车比赛的场景,有多个赛车需要在比赛开始前进行准备,然后同时开始比赛。这个场景可以用 CountDownLatch 来模拟。
import java.util.concurrent.CountDownLatch;
class RaceCar implements Runnable {
private final String carName;
private final CountDownLatch preparationLatch;
private final CountDownLatch startLatch;
public RaceCar(String carName, CountDownLatch preparationLatch, CountDownLatch startLatch) {
this.carName = carName;
this.preparationLatch = preparationLatch;
this.startLatch = startLatch;
}
@Override
public void run() {
try {
System.out.println(carName + " 正在进行准备...");
// 模拟赛车准备工作
// 准备完成,等待其他赛车准备好
preparationLatch.countDown();
preparationLatch.await();
System.out.println(carName + " 准备就绪,等待比赛开始...");
// 模拟赛车等待比赛开始
// 比赛开始
startLatch.countDown();
System.out.println(carName + " 开始比赛!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("线程被中断:" + e.getMessage());
}
}
}
public class RaceSimulation {
public static void main(String[] args) {
int numCars = 5;
CountDownLatch preparationLatch = new CountDownLatch(numCars);
CountDownLatch startLatch = new CountDownLatch(1);
for (int i = 1; i <= numCars; i++) {
String carName = "赛车" + i;
new Thread(new RaceCar(carName, preparationLatch, startLatch)).start();
}
try {
System.out.println("等待所有赛车准备就绪...");
preparationLatch.await();
System.out.println("所有赛车准备就绪,发出比赛开始命令!");
startLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("主线程被中断:" + e.getMessage());
}
}
}