【源码解析】从CyclicBarrier角度聊聊AQS

发布时间:2023年12月17日

案例

CyclicBarrier是多个线程互相等待,并且可以循环使用的。CyclicBarrier是基于conditon同步队列实现的,而CountDownLatch则是基于AQS的state实现的。
在这里插入图片描述

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Thread t1 = new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " before");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " after");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " before");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " after");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }, "t2");

        t1.start();
        t2.start();
        System.out.println("执行完毕");

    }

从执行结果来看,其实就是先t1和t2线程执行before 然后等待,之后统一执行after接口。一般在并发调用接口测试的时候可以使用。

源码解析

构造方法

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        // 参与的线程数 count记录当前多少线程没有达到栅栏
        this.count = parties;
        //代表每一代
        this.barrierCommand = barrierAction;
    }
    // CyclicBarrierr是可以重复的,每次从开始使用到穿过栅栏当作一代
    private static class Generation {
        boolean broken = false;
    }

    private final ReentrantLock lock = new ReentrantLock();

    // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
    private final Condition trip = lock.newCondition();
   
    // 参与线程数
    private final int parties;

    // 越过栅栏之前 要执行响应的操作
    private final Runnable barrierCommand;

    // 当前所处的代
    private Generation generation = new Generation();

    // 2     // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
    //23     // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
    private int count;

在这里插入图片描述

await

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

1.获取lock锁 生成一个 generation() 新的一代
2.如果非最后一个线程 ,//此线程会添加到Condition条件队列中,并在此阻塞。
3.当最后一个线程调用 ,先判断是否有需要执行的任务,有则执行,没有继续 唤醒等待线程,开启下一轮。

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 先获取锁
        // await 适当锁 siganl 重新获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            // 检查栅栏是否被打破,被打破 抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 中断 抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;

            // 最后一个线程达到的时候,唤醒所有等待的线程,开启新的一代 (generation)
            // 等于0,说明所有的线程都在栅栏上,准备通过
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 如果初始化的时候,指定了通过栅栏要执行的操作,这里会直接执行run方法
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;

                    // 唤醒等待的线程,开启新的一代
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        // 发生了异常,需要打破栅栏
                        // 唤醒所有等待的线程 broken为true , 重置 count 为 parties
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            // 非最后一个线程执行的逻辑
            for (;;) {
                try {
                    if (!timed)
                        // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll(); // 唤醒所有等待的线程
        // set up next generation
        count = parties; // 将当前的线程重新设置为parties
        generation = new Generation(); // 生成一个新的一代
    }

别的方法就比较简单的,整体上来说,其实就是利用 基于 Condition 来实现的。先阻塞线程到conditon队列中,因为condition会将线程再次从条件等待队列转移到aqs同步等待队列中阻塞,然后进一步通过最后的线程进行codniton的siganlAll()唤醒全部线程。

CyclicBarrier与CountDownLatch的区别

  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  2. CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、 isBroken(用来知道阻塞的线程是否被中断)等方法。
  3. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  4. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  5. CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  6. CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现
文章来源:https://blog.csdn.net/jia970426/article/details/135050130
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。