详解Java多线程之循环栅栏技术CyclicBarrier

发布时间:2024年01月13日

第1章:引言

大家好,我是小黑,工作中,咱们经常会遇到需要多个线程协同工作的情况。CyclicBarrier,直译过来就是“循环屏障”。它是Java中用于管理一组线程,并让它们在某个点上同步的工具。简单来说,咱们可以把一群线程想象成一队马拉雪橇的驯鹿,CyclicBarrier就像是一个指定的集合点,所有驯鹿必须到齐了,才能继续下一段旅程。

不过别担心,这听起来比实际复杂。实际上,CyclicBarrier提供了一种简单的方式来达到这个同步目的。它通过一个计数器来实现,这个计数器初始值是线程的数量。当一个线程到达屏障点时,计数器就减一。当计数器减到0时,表示所有线程都到齐了,然后咱们可以执行一些操作,或者继续执行下一步。

第2章:CyclicBarrier基础

要深入理解CyclicBarrier,咱们首先得知道它是怎么工作的。CyclicBarrier在Java的java.util.concurrent包中,是并发编程的一部分。它主要用于让一组线程互相等待,直到所有线程都达到了一个公共屏障点(Barrier Point),然后这些线程才继续执行。

让小黑举个简单的例子。假设咱们有一个任务,需要四个线程同时开始执行。这就可以用CyclicBarrier来实现。小黑写了下面这段代码,来展示基本的用法:

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        // 创建一个新的CyclicBarrier,其中包括4个线程
        CyclicBarrier barrier = new CyclicBarrier(4, () -> System.out.println("所有线程到达屏障点,可以继续执行!"));

        // 创建四个线程
        for (int i = 0; i < 4; i++) {
            int threadNum = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + threadNum + " 正在执行任务");
                    Thread.sleep(1000); // 模拟任务执行时间
                    System.out.println("线程 " + threadNum + " 到达屏障点");
                    barrier.await(); // 等待其他线程
                    System.out.println("线程 " + threadNum + " 继续执行其他任务");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在这个例子中,咱们创建了一个CyclicBarrier实例,这个实例要求四个线程都达到屏障点后才能继续执行。每个线程在执行自己的任务后,会调用barrier.await();来等待其他线程。所有线程都调用了await()方法后,计数器变为0,屏障就被克服了,每个线程继续执行它们之后的任务。

第3章:CyclicBarrier的核心特性

了解了CyclicBarrier的基本用法后,咱们来深入探讨一下它的核心特性。这些特性让CyclicBarrier成为并发编程中一个非常有用的工具,特别是在处理多线程同步问题时。

重用性

CyclicBarrier的一个显著特点是它的重用性。这意味着一旦所有等待线程都到达屏障,它就可以重置并重用。这个特性使得CyclicBarrier非常适合于那些需要多次等待一组线程到达同一点的情况。

让小黑用一个例子来说明这一点。假设咱们有一个处理数据的多阶段任务,每个阶段都需要所有线程完成后才能进入下一阶段。这里就可以运用CyclicBarrier的重用性。

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierReuseExample {
    private static final int THREAD_COUNT = 3;

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> System.out.println("所有线程完成当前阶段,准备进入下一阶段!"));

        for (int i = 0; i < THREAD_COUNT; i++) {
            int threadNum = i;
            new Thread(() -> {
                try {
                    for (int phase = 1; phase <= 3; phase++) { // 假设有三个阶段
                        System.out.println("线程 " + threadNum + " 完成阶段 " + phase);
                        barrier.await(); // 等待其他线程
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在这个例子中,每个线程都会经历三个阶段。每个阶段都使用相同的CyclicBarrier来同步线程。线程完成一个阶段后,就会等待其他线程。一旦所有线程都完成了该阶段,CyclicBarrier就会重置,让线程开始下一个阶段。

同步辅助功能

CyclicBarrier还提供了一个同步辅助功能:当所有线程都到达屏障时,可以执行一个预定义的动作。这是通过在CyclicBarrier的构造函数中提供一个Runnable来实现的。

这个功能非常有用,因为它允许咱们在所有线程都到达屏障后,执行一些处理,比如更新共享资源、合并结果等。小黑再来给大家展示一个例子:

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierActionExample {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("全部线程已到达屏障点,执行屏障动作"));

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                    Thread.sleep(2000); // 模拟任务执行时间
                    barrier.await(); // 等待其他线程
                    System.out.println(Thread.currentThread().getName() + " 继续执行后续任务");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在这个代码中,当所有线程都到达屏障点时,会执行一段指定的代码,即打印出“全部线程已到达屏障点,执行屏障动作”。这样的设计使得CyclicBarrier不仅仅是一个同步工具,还可以作为线程间协调的一种手段。

通过这些特性,CyclicBarrier成为了处理复杂同步问题的有力工具。它不仅能确保线程在继续执行前达到某个公共点,还能够在所有线程都准备好后执行。

第4章:CyclicBarrier的实际应用场景

并行计算

一个典型的应用场景是并行计算。假设咱们有一个大数据集,需要进行复杂的数据处理,这个处理过程可以分解为多个独立的子任务,每个子任务由一个单独的线程处理。但在进行下一步处理之前,必须确保所有子任务都完成了当前步骤。这里就是CyclicBarrier大显身手的时候。

来看看下面这个例子,小黑写了一段代码,模拟了这种情况:

import java.util.concurrent.CyclicBarrier;

public class ParallelComputationExample {
    private static final int THREAD_COUNT = 4;

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> System.out.println("所有子任务处理完成,准备进入下一步!"));

        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(new Worker(i, barrier)).start();
        }
    }

    static class Worker implements Runnable {
        private final int threadNumber;
        private final CyclicBarrier barrier;

        Worker(int threadNumber, CyclicBarrier barrier) {
            this.threadNumber = threadNumber;
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println("线程 " + threadNumber + " 正在处理任务");
                Thread.sleep(2000); // 模拟任务处理时间
                System.out.println("线程 " + threadNumber + " 完成任务,等待其他线程");
                barrier.await(); // 等待其他线程完成
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在这个例子中,咱们创建了四个线程,每个线程都代表一个数据处理任务。这些线程在完成自己的任务后,会等待其他线程完成,然后一起进入下一步。

第5章:深入CyclicBarrier的API

咱们已经看到了CyclicBarrier在实际场景中的一些应用,现在小黑要带大家更深入地了解一下CyclicBarrier的API。理解这些API对于充分利用CyclicBarrier的功能是至关重要的。

基本方法

CyclicBarrier提供了一些核心的方法来控制线程间的同步:

  • CyclicBarrier(int parties): 这是CyclicBarrier的构造函数,parties指的是必须调用await方法的线程数量。
  • CyclicBarrier(int parties, Runnable barrierAction): 这个构造函数除了指定线程数外,还可以指定当所有线程都到达屏障时,要执行的操作。
  • await(): 线程调用这个方法告诉CyclicBarrier它已到达屏障点。如果所有线程都到达屏障,它们就会继续执行;否则,调用await的线程会阻塞,等待其他线程。
示例:使用CyclicBarrier同步任务

为了更好地理解这些API,小黑准备了一个具体的例子。假设咱们有一个任务,需要多个线程协作完成,每个线程执行完各自的部分后,需要等待其他线程也执行完毕,然后统一进行下一步操作。

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierApiExample {
    public static void main(String[] args) {
        // 定义一个新的CyclicBarrier,需要3个线程协作
        CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("所有线程准备就绪,开始下一步操作"));

        for (int i = 0; i < 3; i++) {
            new Thread(new Task(barrier), "线程 " + i).start();
        }
    }

    static class Task implements Runnable {
        private final CyclicBarrier barrier;

        Task(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                Thread.sleep(1000); // 模拟任务执行时间
                System.out.println(Thread.currentThread().getName() + " 完成任务,等待其他线程");
                barrier.await(); // 等待其他线程
                System.out.println(Thread.currentThread().getName() + " 开始执行后续操作");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在这个例子中,每个线程都在执行它的任务。完成任务后,它会等待其他线程也完成任务。只有当所有线程都执行了barrier.await()方法之后,才会执行CyclicBarrier的barrierAction,即打印出“所有线程准备就绪,开始下一步操作”。

异常处理

处理异常也是使用CyclicBarrier时需要考虑的一个重要方面。如果任何线程在等待过程中被中断或超时,或者屏障被重置,或者屏障的await方法被中断,BrokenBarrierExceptionInterruptedException将会被抛出。这些异常需要被妥善处理,以确保程序的健壮性和正确性。

第6章:CyclicBarrier的高级用法

动态调整参与线程数

CyclicBarrier提供了一种机制,允许在运行时动态调整等待的线程数量。这在一些动态变化的并发场景中非常有用,比如线程数量会根据任务的不同而变化。

为了展示这个特性,小黑写了以下的例子。在这个例子中,咱们会创建一个CyclicBarrier,并在运行时根据需要动态调整它的屏障点:

import java.util.concurrent.CyclicBarrier;

public class DynamicCyclicBarrierExample {
    public static void main(String[] args) {
        // 初始时,屏障点设置为3
        CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("屏障点动作执行"));

        for (int i = 0; i < 2; i++) { // 初始只启动两个线程
            new Thread(new Worker(barrier), "线程 " + i).start();
        }

        // 动态调整屏障点,现在需要4个线程到达屏障点
        barrier.reset(); // 重置CyclicBarrier,这也会打破任何当前等待的线程
        barrier = new CyclicBarrier(4, () -> System.out.println("新的屏障点动作执行"));

        for (int i = 0; i < 4; i++) { // 现在启动四个线程
            new Thread(new Worker(barrier), "线程 " + i).start();
        }
    }

    static class Worker implements Runnable {
        private final CyclicBarrier barrier;

        Worker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 到达屏障点");
                barrier.await();
                System.out.println(Thread.currentThread().getName() + " 继续执行");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在这个例子中,咱们先创建了一个需要3个线程到达的CyclicBarrier。但后来因为需求变化,我们通过调用reset()方法重置了CyclicBarrier,并创建了一个新的CyclicBarrier,这次需要4个线程。这展示了如何根据实际情况调整同步点的数量。

结合其他并发工具使用

CyclicBarrier还可以与Java的其他并发工具一起使用,以解决更复杂的并发问题。例如,可以将其与ExecutorService结合使用,以管理线程池中的一组任务。

看看下面的例子,小黑展示了如何将CyclicBarrier与线程池结合使用:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierWithExecutorServiceExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        CyclicBarrier barrier = new CyclicBarrier(4, () -> System.out.println("所有任务完成,准备下一轮执行"));

        for (int i = 0; i < 4; i++) {
            executorService.execute(new Worker(barrier));
        }

        executorService.shutdown();
    }

    static class Worker implements Runnable {
        private final CyclicBarrier barrier;

        Worker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                Thread.sleep(1000);
                barrier.await();
                System.out.println(Thread.currentThread().getName() + " 任务完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

第7章:CyclicBarrier的问题和解决方案

1. BrokenBarrierException的处理

当参与CyclicBarrier的某个线程在等待期间被中断,或者CyclicBarrier被重置,或者在屏障点等待的线程超时时,就会抛出BrokenBarrierException异常。这通常意味着CyclicBarrier无法正常工作。

解决这个问题的关键是要正确处理这个异常。咱们可以设置适当的异常处理逻辑,确保即使在出现异常时,程序也能以一种预期的方式继续运行。例如,可以在捕获到BrokenBarrierException时重置CyclicBarrier,或者采取其他恢复措施。

2. 超时的处理

如果咱们希望线程在等待达到屏障点的过程中不要无限期地等待,可以使用await(long timeout, TimeUnit unit)方法,为等待设置一个超时时间。如果在指定的时间内没有所有的线程都到达屏障点,就会抛出TimeoutException

处理超时的策略可能包括重试机制或者回退逻辑。但重要的是要确保所有的线程在超时后都能正确地处理这种情况,避免资源泄漏或者线程阻塞。

示例代码:处理异常和超时

下面是一个示例,展示了如何在CyclicBarrier中处理BrokenBarrierException和超时异常:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CyclicBarrierExceptionHandlingExample {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3);

        for (int i = 0; i < 3; i++) {
            new Thread(new Task(barrier), "线程 " + i).start();
        }
    }

    static class Task implements Runnable {
        private final CyclicBarrier barrier;

        Task(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " 到达屏障点,等待其他线程");
                barrier.await(2, TimeUnit.SECONDS); // 设置超时时间为2秒
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " 被中断");
            } catch (BrokenBarrierException e) {
                System.out.println(Thread.currentThread().getName() + " 检测到屏障损坏");
            } catch (TimeoutException e) {
                System.out.println(Thread.currentThread().getName() + " 等待超时");
            }
            System.out.println(Thread.currentThread().getName() + " 继续执行后续操作");
        }
    }
}

在这个例子中,每个线程在执行任务后会尝试等待其他线程,但如果等待超过2秒,就会抛出TimeoutException。同时,这个代码也演示了如何处理InterruptedExceptionBrokenBarrierException,确保线程在异常发生时能够正确地继续执行。

3. CyclicBarrier重置问题

在使用CyclicBarrier时,还可能遇到需要重置屏障的情况。这可以通过调用reset()方法实现,但要注意这个操作会打破正在等待的线程。因此,在重置CyclicBarrier之前,需要确保所有线程都已经离开屏障点,或者咱们愿意接受打断它们的等待过程。

第8章:总结

  • 基本用法:CyclicBarrier主要用于协调多个线程,确保它们在继续执行之前在某个公共点同步。
  • 重用性:一个CyclicBarrier可以被重复使用,这对于那些分阶段执行的多线程任务非常有用。
  • 异常处理:正确处理BrokenBarrierExceptionTimeoutException对于构建健壮的并发应用至关重要。
  • 与其他工具的结合:CyclicBarrier可以与Java的其他并发工具,如ExecutorService,配合使用,以处理更复杂的并发场景。

学习并发编程是一个持续的过程。技术总是在发展,新的挑战总是在出现。保持好奇心,不断学习,小黑相信你会在这条路上越走越远!

文章来源:https://blog.csdn.net/weixin_42116348/article/details/135564235
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。