CyclicBarrier是多个线程互相等待,并且可以循环使用的。CyclicBarrier是基于conditon同步队列实现的,而CountDownLatch则是基于AQS的state实现的。
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
Thread t1 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " before");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " after");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " before");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " after");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "t2");
t1.start();
t2.start();
System.out.println("执行完毕");
}
从执行结果来看,其实就是先t1和t2线程执行before 然后等待,之后统一执行after接口。一般在并发调用接口测试的时候可以使用。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
// 参与的线程数 count记录当前多少线程没有达到栅栏
this.count = parties;
//代表每一代
this.barrierCommand = barrierAction;
}
// CyclicBarrierr是可以重复的,每次从开始使用到穿过栅栏当作一代
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
// Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
private final Condition trip = lock.newCondition();
// 参与线程数
private final int parties;
// 越过栅栏之前 要执行响应的操作
private final Runnable barrierCommand;
// 当前所处的代
private Generation generation = new Generation();
// 2 // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
//23 // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
private int count;
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
1.获取lock锁 生成一个 generation() 新的一代
2.如果非最后一个线程 ,//此线程会添加到Condition条件队列中,并在此阻塞。
3.当最后一个线程调用 ,先判断是否有需要执行的任务,有则执行,没有继续 唤醒等待线程,开启下一轮。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 先获取锁
// await 适当锁 siganl 重新获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 检查栅栏是否被打破,被打破 抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 中断 抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// 最后一个线程达到的时候,唤醒所有等待的线程,开启新的一代 (generation)
// 等于0,说明所有的线程都在栅栏上,准备通过
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果初始化的时候,指定了通过栅栏要执行的操作,这里会直接执行run方法
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒等待的线程,开启新的一代
nextGeneration();
return 0;
} finally {
if (!ranAction)
// 发生了异常,需要打破栅栏
// 唤醒所有等待的线程 broken为true , 重置 count 为 parties
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 非最后一个线程执行的逻辑
for (;;) {
try {
if (!timed)
// 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); // 唤醒所有等待的线程
// set up next generation
count = parties; // 将当前的线程重新设置为parties
generation = new Generation(); // 生成一个新的一代
}
别的方法就比较简单的,整体上来说,其实就是利用 基于 Condition 来实现的。先阻塞线程到conditon队列中,因为condition会将线程再次从条件等待队列转移到aqs同步等待队列中阻塞,然后进一步通过最后的线程进行codniton的siganlAll()唤醒全部线程。