Phaser详解

发布时间:2024年01月11日

Phaser是一个相对较新且功能强大的同步原语,它于Java 7中引入,用于协调并行任务的执行。与CyclicBarrierCountDownLatch等传统的同步工具相比,Phaser提供了更灵活和更高级的功能,特别是在处理动态和可变的并行任务集合时。

1.Phaser基本概念

Phaser是一个可重用的同步屏障,它允许一组线程互相等待,直到所有线程都到达某个屏障(barrier point)为止。但与CyclicBarrier不同的是,Phaser支持动态调整参与的线程数,并且允许线程在一个phaser阶段完成后注册参与下一个阶段。
每个Phaser对象都有一个整数表示的阶段(phase)计数,每个阶段可以看作是一组并行任务的一个协调点。当所有注册的线程通过调用arrive()arriveAndAwaitAdvance()方法到达一个阶段时,该阶段就会结束,所有等待的线程将被释放以继续执行,同时阶段计数会增加。

2.构造函数和关键方法

构造函数:

  1. Phaser()
    创建一个新的Phaser对象,其初始阶段计数为0,且没有注册的参与者。这通常用于层次结构的Phaser,其中子Phaser会动态地注册到父Phaser
  2. Phaser(int parties)
    创建一个新的Phaser对象,并初始化给定数量的参与者。这里的parties表示在Phaser更改阶段之前,必须通过调用arriveAndAwaitAdvance()arrive()方法的线程数。
  3. Phaser(Phaser parent)
    创建一个新的Phaser对象,并将其关联到给定的父Phaser。子Phaser的终止会影响到父Phaser的终止,但反之则不然。此构造函数创建的Phaser初始时没有注册的参与者。
  4. Phaser(Phaser parent, int parties)
    创建一个新的Phaser对象,它既有父Phaser又有初始注册的参与者数量。这个构造函数结合了前两个构造函数的功能。

关键方法:

  • arrive():表示当前线程已经完成了当前阶段的工作,并减少到达该阶段的线程数。
  • arriveAndAwaitAdvance():与arrive()类似,但它还会使当前线程等待其他线程到达此阶段,然后一起进入下一个阶段。
  • awaitAdvance(int phase):等待直到Phaser的当前阶段改变为给定的phase,或者当前线程被中断。
  • register():在当前阶段增加一个未到达的线程。这允许动态地添加新的参与者。
  • bulkRegister(int parties):一次性注册多个未到达的线程。

3.Phaser的优势

  1. 灵活性:与CyclicBarrier相比,Phaser允许在运行时动态地添加或移除参与者。
  2. 多阶段支持:Phaser支持多个协调点,而不仅仅是单个屏障。
  3. 可重用性:一旦所有线程到达一个阶段,Phaser可以自动地或手动地重置为下一个阶段,而不需要重新创建。

4.使用Phaser实现并行计算

import java.util.concurrent.Phaser;  
  
public class PhaserExample {  
    public static void main(String[] args) {  
        final int numberOfTasks = 10;  
        final Phaser phaser = new Phaser(numberOfTasks); // 创建一个Phaser,初始参与者数量为10  
  
        for (int i = 0; i < numberOfTasks; i++) {  
            final int taskID = i;  
            new Thread(() -> {  
                System.out.println("Task " + taskID + " is starting.");  
                // 模拟计算工作  
                try {  
                    Thread.sleep((long) (Math.random() * 1000));  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println("Task " + taskID + " is finishing.");  
  
                phaser.arrive(); // 表示任务完成  
            }).start();  
        }  
  
        // 等待所有任务完成  
        phaser.awaitAdvance(phaser.getPhase());  
        System.out.println("All tasks are complete.");  
    }  
}

运行结果:

Task 1 is starting.
Task 2 is starting.
Task 0 is starting.
Task 4 is starting.
Task 3 is starting.
Task 5 is starting.
Task 6 is starting.
Task 7 is starting.
Task 8 is starting.
Task 9 is starting.
Task 1 is finishing.
Task 4 is finishing.
Task 2 is finishing.
Task 8 is finishing.
Task 6 is finishing.
Task 3 is finishing.
Task 7 is finishing.
Task 9 is finishing.
Task 0 is finishing.
Task 5 is finishing.
All tasks are complete.

在这个例子中,我们创建了一个包含10个任务的Phaser,每个任务都在它自己的线程中运行。当每个任务完成时,它会通过调用arrive()方法通知Phaser。主线程通过调用awaitAdvance()方法等待所有任务完成。
注意,虽然这个例子使用了固定数量的任务,但Phaser的真正优势在于能够处理动态添加或移除的并行任务。通过调用register()arrive()方法,可以在任何时候增加新的参与者到当前的阶段中。
Phaser是Java并发工具包中一个强大而灵活的组件,它提供了协调并行任务执行的高级机制。在处理复杂的多线程问题时,考虑使用Phaser可以使代码更加简洁且易于管理。

5.总结

Phaser是Java并发工具包中一个强大而灵活的组件,它提供了协调并行任务执行的高级机制。在处理复杂的多线程问题时,考虑使用Phaser可以使代码更加简洁且易于管理

文章来源:https://blog.csdn.net/qq_36649893/article/details/135540494
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。