并发编程之JUC并发工具类下

发布时间:2024年01月10日

目录

CyclicBarrier(回环栅栏或循环屏障)

特点

常用方法

CyclicBarrier应用场景

CyclicBarrier与CountDownLatch区别

Exchanger

特点

常用方法

Exchanger的应用场景

Phaser(阶段协同器)

特点

常用方法

Phaser的应用场景


CyclicBarrier(回环栅栏或循环屏障)

? ?CyclicBarrier是Java中的并发工具类,用于在多个线程之间创建一个屏障,所有线程必须在这个屏障前等待,一旦所有线程都到达了屏障,屏障才会打开,允许所有线程继续执行。它与 CountDownLatch的区别在于,CountDownLatch是一次性的,而CyclicBarrier可以被重用,因为一旦屏障打开,它会被重置,可以再次使用。以下是CyclicBarrier的一些特点以及常见使用方法:

特点

1. 等待线程数量: 创建 CyclicBarrier时需要指定等待的线程数量,所有线程都到达之后,屏障被打破,所有线程同时继续执行。

2. 重用性: 一旦所有线程都到达屏障点并释放,CyclicBarrier就可以被重置,以便下一轮使用。

3. 回调函数: 可以在所有线程到达屏障点后,指定一个Runnable任务来执行。

常用方法

1. 构造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)

parties:指定需要等待的线程数量。
barrierAction:在所有线程都到达屏障点后要执行的Runnable任务。

2. await 方法

public int await() throws InterruptedException, BrokenBarrierException

调用线程告诉CyclicBarrier它已经到达了屏障点,并等待其他线程到达。
当前线程会阻塞,直到所有线程都到达屏障点。
返回值表示当前线程是第几个到达屏障的,方便在回调函数中识别。

3. reset 方法

public void reset()

reset方法会将屏障恢复到初始状态,允许线程再次等待。这对于周期性地执行同步操作的情况非常有用。

CyclicBarrier应用场景

多线程任务:CyclicBarrier可以用于将复杂的任务分配给多个线程执行,并在所有线程完成工作后触发后续操 作。

数据处理:CyclicBarrier可以用于协调多个线程间的数据处理,在所有线程处理完数据后触发后续操作。

举例模拟人满发车如下:

import java.util.concurrent.*;

public class CyclicBarrierDemo {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("人齐了,准备发车"));

        for (int i = 0; i < 10; i++) {
            final int id = i + 1;
            executorService.submit(() -> {
                try {
                    System.out.println(id + "号马上就到");
                    int sleepMills = ThreadLocalRandom.current().nextInt(2000);
                    Thread.sleep(sleepMills);
                    System.out.println(id + "号到了,上车");
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
    }
}
CyclicBarrier与CountDownLatch区别

CountDownLatch是一次性的,CyclicBarrier是可循环利用的。

CyclicBarrier在所有线程都到达屏障点后,可以执行一个可选的Runnable任务;CountDownLatch主要用于等待其他线程完成,没有特定的任务或动作。


Exchanger

? ? ? ? Exchanger是一种并发工具类,用于在两个线程之间交换数据。它提供了一个同步点,在这个同步点上,两个线程可以安全地交换数据。以下是Exchanger的一些特点以及常见使用方法:

特点

1. 用途:Exchanger用于实现两个线程之间的数据交换,允许两个线程在同步点上交换数据。

2. 双向交换:允许两个线程在一个点上进行数据交换,每个线程可以通过exchange()方法把自己的数据传递给对方。

3. 阻塞操作:如果一个线程在调用exchange()时没有找到另一个线程,它将会阻塞等待,直到另一个线程到达为止。

4. 数据传递:数据传递是按需进行的,当两个线程都到达同步点时才会进行交换。

常用方法

1.exchange(V x,):这个方法用于在两个线程之间交换数据。当一个线程调用这个方法时,它会被阻塞,直到另一个线程也调用了exchange()方法,然后两个线程可以交换数据,并且返回对方的数据。

2.exchange(V x, long timeout, TimeUnit unit):允许设置超时时间,在指定的时间内如果没有线程到达交换点,则抛出TimeoutException异常。

Exchanger的应用场景

数据交换:在多线程环境中,两个线程可以通过 Exchanger 进行数据交换。

数据采集:在数据采集系统中,可以使用 Exchanger 在采集线程和处理线程间进行数据交换。

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    private static Exchanger<String> exchanger = new Exchanger<>();
    static String goods = "衣服";
    static String money = "200";

    public static void main(String[] args) throws InterruptedException {

        System.out.println("准备交易,一手交钱一手交货...");

        // 卖家
        new Thread(() -> {
            System.out.println("卖家到了,已经准备好货:" + goods);
            try {
                String receivedMoney = exchanger.exchange(goods);
                System.out.println("卖家收到钱:" + receivedMoney);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(3000);

        // 买家
        new Thread(() -> {
            try {
                System.out.println("买家到了,已经准备好钱:" + money);
                String receivedGoods = exchanger.exchange(money);
                System.out.println("买家收到货:" + receivedGoods);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Phaser(阶段协同器)

? Phaser是一种并发工具类,用于协调多个线程的同步执行。它与CountDownLatch和CyclicBarrier类似,但提供了更灵活的功能。Phaser允许多个线程在特定的同步点上同步,并且可以动态地增加或减少参与同步的线程数量。

特点

1. 阶段同步:Phaser可以分成多个阶段,每个阶段可以有多个参与者线程。所有线程在达到同一个阶段之前将被阻塞,一旦所有线程都完成了当前阶段的任务,Phaser进入下一个阶段。

2.?动态注册:可以在Phaser运行时动态地注册新的参与者线程,也可以动态注销不再参与同步的线程。

3. 灵活的同步点:Phaser提供了arrive()和awaitAdvance()等方法,使线程能够在指定的同步点到达后继续执行。

4.?分层结构:?Phaser支持分层结构,可以创建父Phaser,子Phaser可以独立运行并通过其父Phaser进行同步。

常用方法

构造方法
Phaser()? 参与任务数0
Phaser(int parties)? ?指定初始参与任务数
Phaser(Phaser parent)? ?指定parent阶段器, 子对象作为一个整体加入parent对象, 当子对象中没有参与者时,会自动从parent对象解除注册
Phaser(Phaser parent,int parties)? ??集合上面两个方法

增减参与任务数方法
int register()
增加一个任务数,返回当前阶段号。
int bulkRegister(int parties) 增加指定任务个数,返回当前阶段号。
int arriveAndDeregister() 减少一个任务数,返回当前阶段号

到达、等待方法
int arrive()?
到达(任务完成),返回当前阶段号。
int arriveAndAwaitAdvance()? 到达后等待其他任务到达,返回到达阶段号。
int awaitAdvance(int phase)? 在指定阶段等待(必须是当前阶段才有效)
int awaitAdvanceInterruptibly(int phase)? 阶段到达触发动作
int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
protected boolean onAdvance(int phase,int registeredParties)
类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象

Phaser的应用场景

1. 多线程任务分配:Phaser 可以用于将复杂的任务分配给多个线程执行,并协调线程间的合作。
2. 多级任务流程:Phaser 可以用于实现多级任务流程,在每一级任务完成后触发下一级任务的开始。
3. 模拟并行计算:Phaser 可以用于模拟并行计算,协调多个线程间的工作。
4. 阶段性任务:Phaser 可以用于实现阶段性任务,在每一阶段任务完成后触发下一阶段任务的开始。

以模拟公司团建举例如下:

import java.util.concurrent.Phaser;

public class CompanyOutingSimulation {
    public static void main(String[] args) throws InterruptedException {
        final int PARTICIPANTS = 5;
        Phaser phaser = new Phaser(PARTICIPANTS);
        System.out.println("阶段1: 公司集合");
        // 模拟阶段1: 到公司集合
        for (int i = 0; i < PARTICIPANTS; i++) {
            int participantId = i;
            Thread arrivalThread = new Thread(() -> {
                System.out.println("参与者 " + participantId + " 到公司集合");
                phaser.arriveAndAwaitAdvance(); // 等待其他线程到达阶段1
            });
            arrivalThread.start();
        }
        
        // 模拟阶段2: 出发去公园
        phaser.arriveAndAwaitAdvance(); // 所有线程到达阶段1后继续执行
        System.out.println("阶段2: 出发去公园");
        for (int i = 0; i < PARTICIPANTS; i++) {
            int participantId = i;
            Thread parkTripThread = new Thread(() -> {
                System.out.println("参与者 " + participantId + " 出发去公园");
                phaser.arriveAndAwaitAdvance(); // 等待其他线程到达阶段2
            });
            parkTripThread.start();
        }

        // 模拟阶段3: 去餐厅,有两个人离开
        phaser.arriveAndAwaitAdvance(); // 所有线程到达阶段2后继续执行
        System.out.println("阶段3: 出发去餐厅");

        // 模拟两个人离开
        for (int i = 0; i < 2; i++) {
            phaser.arriveAndDeregister(); // 离开的人取消注册
            System.out.println("有人离开,当前参与人数: " + phaser.getRegisteredParties());
        }

        // 模拟阶段4: 就餐,有三个人加入
        phaser.arriveAndAwaitAdvance(); // 所有线程到达阶段3后继续执行
        System.out.println("阶段4: 就餐");

        // 模拟三个人加入
        for (int i = 0; i < 3; i++) {
            phaser.register(); // 新加入的人注册
            System.out.println("有人加入,当前参与人数: " + phaser.getRegisteredParties());
        }
    }
}

? ? ? ?以上案例中,每个参与者都是一个线程,通过Phaser实现对四个阶段的同步。在第3阶段,两个人离开,通过phaser.arriveAndDeregister()???????取消注册。在第4阶段,三个人加入,通过phaser.register()注册。

文章来源:https://blog.csdn.net/m0_61853556/article/details/135504316
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。