【Java 并发】CountDownLatch 介绍

发布时间:2023年12月20日

在并发编程中, 经常会遇到需要等待某些任务完成后再执行其他任务的情况。这就涉及到线程之间的协作和同步。
CountDownLatch 是 Java 并发包中的一个类, 它允许一个或多个线程等待其他线程完成操作, 再继续执行。
其灵活性和简洁性使得它成为并发编程中的不可或缺的工具。

1 CountDownLatch 简介

CountDownLatch 的核心特点在于它是一种倒计数器, 初始值设定为某个正整数。
在多线程任务中, 当一个线程完成了一定的操作, 可通过调用 countDown() 方法来使计数器减 1, 而其他线程可通过 await() 方法等待计数器变为零。
一旦计数器变为零, 等待的线程将被唤醒继续执行。

为了能够理解 CountDownLatch, 举一个很通俗的例子。
运动员进行跑步比赛时, 假设有 6 名运动员参与比赛, 裁判员在终点会为这 6 名运动员分别计时, 可以想象每当一个运动员到达终点的时候,
对于裁判员来说就少了一个计时任务。直到所有运动员都到达终点了, 裁判员的任务也才完成。这 6 名运动员可以类比成 6 个线程, 当线程
调用 CountDownLatch.countDown 方法时就会对计数器的值减一, 直到计数器的值为 0 的时候, 裁判员 (调用 await 方法的线程) 才能继续往下执行。

下面通过代码的形式模拟一下上面的过程, 了解一下 CountDownLatch 的具体用法:

public class CountDownLatchDemo {

    private static CountDownLatch startSignal = new CountDownLatch(1);

    //用来表示裁判员需要维护的是 6 个运动员
    private static CountDownLatch endSignal = new CountDownLatch(6);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(6);

        for (int i = 0; i < 6; i++) {
            executorService.execute(() -> {
                try {

                    System.out.println(Thread.currentThread().getName() + " 运动员等待裁判员响哨!!!");
                    // 这时候线程会被挂起
                    startSignal.await();

                    System.out.println(Thread.currentThread().getName() + "正在全力冲刺");
                    
                    endSignal.countDown();
                    System.out.println(Thread.currentThread().getName() + "  到达终点");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        System.out.println("裁判员发号施令啦!!!");
        // startSignal 里面的计数值 - 1, 本身维护的就是 1, 减 1 后, 为 0, 所有卡在 startSignal 的线程都会唤醒
        // 即线程池中的 6 个线程都会唤醒
        startSignal.countDown();

        // 等待 endSignal 中的计数值 - 1
        endSignal.await();

        // endSignal 中的计数值变为 0, 继续执行了
        System.out.println("所有运动员到达终点, 比赛结束!");
        executorService.shutdown();
    }
}

输出结果:

pool-1-thread-2 运动员等待裁判员响哨!!!
pool-1-thread-3 运动员等待裁判员响哨!!!
pool-1-thread-1 运动员等待裁判员响哨!!!
pool-1-thread-4 运动员等待裁判员响哨!!!
pool-1-thread-5 运动员等待裁判员响哨!!!
pool-1-thread-6 运动员等待裁判员响哨!!!
裁判员发号施令啦!!!
pool-1-thread-2正在全力冲刺
pool-1-thread-2  到达终点
pool-1-thread-3正在全力冲刺
pool-1-thread-3  到达终点
pool-1-thread-1正在全力冲刺
pool-1-thread-1  到达终点
pool-1-thread-4正在全力冲刺
pool-1-thread-4  到达终点
pool-1-thread-5正在全力冲刺
pool-1-thread-5  到达终点
pool-1-thread-6正在全力冲刺
pool-1-thread-6  到达终点
所有运动员到达终点, 比赛结束!

该示例代码中设置了两个 CountDownLatch, 第一个 endSignal 用于控制让 main 线程 (裁判员) 必须等到其他线程 (运动员) 让 CountDownLatch
维护的数值 counter 减到 0 为止。另一个 startSignal 用于让 main 线程对其他线程进行 “发号施令”, startSignal 引用的 CountDownLatch 初始值为 1,
而其他线程执行的 run 方法中都会先通过 startSignal.await() 让这些线程都被阻塞, 直到 main 线程通过调用 startSignal.countDown() 将
值 N 减 1, CountDownLatch 维护的数值 counter 为 0 后, 其他线程才能往下执行, 并且, 每个线程执行的 run 方法中都会通过 endSignal.countDown(),
对 endSignal 维护的数值进行减一, 由于往线程池提交了 6 个任务, 会被减 6 次, 所以 endSignal 维护的值最终会变为 0, 因此 main 线程在
latch.await() 阻塞结束, 才能继续往下执行。

另外, 需要注意的是, 当调用 CountDownLatch 的 countDown 方法时, 当前线程是不会被阻塞, 会继续往下执行, 比如在该例中会继续输出
pool-1-thread-4 到达终点。

2 CountDownLatch 的方法

先从 CountDownLatch 的构造方法看起:

public CountDownLatch(int count){
    if (count < 0) 
        throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

构造方法会传入一个整数 count, 表示当前的计算器的起始值。
这个值会在 CountDownLatch 的 countDown 方法被调用时 - 1, 直到减到 0 为止, 就会唤醒调用 CountDownLatch await 方法的线程, 让他继续执行下去。

CountDownLatch 的方法不是很多, 这里列举几个比较常用的:

  1. await() throw InterruptedException: 调用该方法的线程被挂起, 直到构造方法传入的 count 减到 0 的时候, 才能继续往下执行
  2. await(long timeout, TimeUnit unit): 与上面的 await 方法功能一致, 只不过这里有了时间限制, 调用该方法的线程等到指定的 timeout 时间后, 不管 count 是否减至为 0, 都会继续往下执行
  3. countDown(): 使 CountDownLatch 计数值 count 减 1
  4. long getCount(): 获取当前 CountDownLatch 当前的计数值

3 CountDownLatch 的源码实现

CountDownLatch 内部是通过 AQS 的共享锁实现的, 所以只要理解了 CountDownLatch 的同步器, 基本就能了解大体的实现了。

3.1 CountDownLatch 的同步器

public class CountDownLatch {

    private static final class Sync extends AbstractQueuedSynchronizer {
    	
        Sync(int count) {
            // 这里的 count 就是声明 CountDownLatch 构造函数传入的计算器
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            // 状态值 == 0 的话, 获取共享锁成功, 否则失败
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // cas 交换状态值
            for (;;) {
                // 状态值 == 0, 是否锁失败
                int c = getState();
                if (c == 0)
                    return false;
                // 否则 状态值减 1     
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    // 减到状态值为 0 释放锁成功
                    return nextc == 0;
            }
        }
    }
}

简简单单的一个同步器, 实现了共享锁的获取和释放的判断规则

3.2 CountDownLatch 的 await 方法

public class CountDownLatch {

    public void await() throws InterruptedException {
        // 1. 先调用到 AbstractQueuedSynchronizer 的 acquireSharedInterruptibly
        // 2. 在 AQS 的 acquireSharedInterruptibly 中先通过 CountDownLatch 自定义的 Sync 的 tryAcquireShared() 方法判断是否可以获取锁
        // 在 tryAcquireShared 方法中只要状态值, 即设置的计数值不等于 0, 都是获取锁失败
        // 3. 获取锁失败后, 会加入同步队列, 等待唤醒
        sync.acquireSharedInterruptibly(1);
    }
}

await(long timeout, TimeUnit unit) 方法这里就不展开了, 实质就是带超时时间的 await 方法,
将内部调用的 acquireSharedInterruptibly 方法换成了带超时时间的 acquireSharedInterruptibly 的方法就是了。

3.3 CountDownLatch 的 countDown 方法

public class CountDownLatch {

    public void countDown() {
        // 1. 先调用到 AbstractQueuedSynchronizer 的 releaseShared
        // 2. 在 AQS 的 releaseShared 中先通过 CountDownLatch 自定义的 Sync 的 tryReleaseShared() 方法判断是否可以获取锁
        // 在 tryReleaseShared 方法中, 获取到当前的状态值, 即计数值, 等于 0, 直接释放锁失败
        // 否则就给计数值 - 1, 减 1 后, 如果最新的计数值为 0 了, 释放锁就成功了, 同时唤醒阻塞在同步队列中的线程
        sync.releaseShared(1);
    }

}

总的来说 CountDownLatch 是一个很简单的类, 全部的实现都是通过 AbstractQueuedSynchronizer 的共享锁实现的。

4 参考

CountDownLatch与CyclicBarrier

文章来源:https://blog.csdn.net/LCN29/article/details/135096837
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。