Phaser
是一个相对较新且功能强大的同步原语,它于Java 7中引入,用于协调并行任务的执行。与CyclicBarrier
和CountDownLatch
等传统的同步工具相比,Phaser
提供了更灵活和更高级的功能,特别是在处理动态和可变的并行任务集合时。
Phaser
是一个可重用的同步屏障,它允许一组线程互相等待,直到所有线程都到达某个屏障(barrier point)为止。但与CyclicBarrier
不同的是,Phaser
支持动态调整参与的线程数,并且允许线程在一个phaser阶段完成后注册参与下一个阶段。
每个Phaser
对象都有一个整数表示的阶段(phase
)计数,每个阶段可以看作是一组并行任务的一个协调点。当所有注册的线程通过调用arrive()
或arriveAndAwaitAdvance()
方法到达一个阶段时,该阶段就会结束,所有等待的线程将被释放以继续执行,同时阶段计数会增加。
构造函数:
Phaser
对象,其初始阶段计数为0,且没有注册的参与者。这通常用于层次结构的Phaser
,其中子Phaser
会动态地注册到父Phaser
。Phaser
对象,并初始化给定数量的参与者。这里的parties
表示在Phaser
更改阶段之前,必须通过调用arriveAndAwaitAdvance()
或arrive()
方法的线程数。Phaser
对象,并将其关联到给定的父Phaser
。子Phaser
的终止会影响到父Phaser
的终止,但反之则不然。此构造函数创建的Phaser
初始时没有注册的参与者。Phaser
对象,它既有父Phaser
又有初始注册的参与者数量。这个构造函数结合了前两个构造函数的功能。关键方法:
arrive()
:表示当前线程已经完成了当前阶段的工作,并减少到达该阶段的线程数。arriveAndAwaitAdvance()
:与arrive()
类似,但它还会使当前线程等待其他线程到达此阶段,然后一起进入下一个阶段。awaitAdvance(int phase)
:等待直到Phaser
的当前阶段改变为给定的phase
,或者当前线程被中断。register()
:在当前阶段增加一个未到达的线程。这允许动态地添加新的参与者。bulkRegister(int parties)
:一次性注册多个未到达的线程。import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
final int numberOfTasks = 10;
final Phaser phaser = new Phaser(numberOfTasks); // 创建一个Phaser,初始参与者数量为10
for (int i = 0; i < numberOfTasks; i++) {
final int taskID = i;
new Thread(() -> {
System.out.println("Task " + taskID + " is starting.");
// 模拟计算工作
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskID + " is finishing.");
phaser.arrive(); // 表示任务完成
}).start();
}
// 等待所有任务完成
phaser.awaitAdvance(phaser.getPhase());
System.out.println("All tasks are complete.");
}
}
运行结果:
Task 1 is starting.
Task 2 is starting.
Task 0 is starting.
Task 4 is starting.
Task 3 is starting.
Task 5 is starting.
Task 6 is starting.
Task 7 is starting.
Task 8 is starting.
Task 9 is starting.
Task 1 is finishing.
Task 4 is finishing.
Task 2 is finishing.
Task 8 is finishing.
Task 6 is finishing.
Task 3 is finishing.
Task 7 is finishing.
Task 9 is finishing.
Task 0 is finishing.
Task 5 is finishing.
All tasks are complete.
在这个例子中,我们创建了一个包含10
个任务的Phaser
,每个任务都在它自己的线程中运行。当每个任务完成时,它会通过调用arrive()
方法通知Phaser
。主线程通过调用awaitAdvance()
方法等待所有任务完成。
注意,虽然这个例子使用了固定数量的任务,但Phaser
的真正优势在于能够处理动态添加或移除的并行任务。通过调用register()
和arrive()
方法,可以在任何时候增加新的参与者到当前的阶段中。
Phaser
是Java并发工具包中一个强大而灵活的组件,它提供了协调并行任务执行的高级机制。在处理复杂的多线程问题时,考虑使用Phaser
可以使代码更加简洁且易于管理。
Phaser
是Java并发工具包中一个强大而灵活的组件,它提供了协调并行任务执行的高级机制。在处理复杂的多线程问题时,考虑使用Phaser
可以使代码更加简洁且易于管理