JAVA并发编程入门之-闭锁、信号量、栅栏

发布时间:2024年01月05日

一、闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态;闭锁的作用相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有线程通过;当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态;
闭锁是一次性对象,一旦进入终止状态就不能被重置;
闭锁可以用来确保某些活动直到其它活动都完成后才继续执行;适用场景:

应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行;
多玩家游戏中当所有玩家都就绪后才执行某项活动;
设想有这样一个功能需要Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给主线程去做汇总,利用闭锁来完成就非常轻松;
闭锁状态包括一个计数器,该计数器被初始化为一个整数,表示需要等待的事件数量;

CountDownLatch

CountDownLatch是闭锁的一种实现;CountDownLatch是在java1.5被引入;

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行;

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

主要API:

countDown():该方法递减计数器,表示有一个事件已经发生;
await():该方法等待计时器达到零,达到零后表示需要等待的所有事件都已发生;
如果计数器的值非零,await方法会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时;

起始门(Starting Gate)
所有子线程等待计数器为零后一起执行

public class Appliction {
    private final static int NUM = 10;

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < NUM; i++) {
            new Thread(() -> {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.err.println(Thread.currentThread().getName() + " started:" + System.currentTimeMillis());
            }).start();
        }
        countDownLatch.countDown();
        System.err.println("main thread exec end");
    }
}

结束门(Ending Gate)
等待所有子任务或子线程结束后(计数器为零),对执行结果进行统计或汇总

/**
 * 假设有10块磁盘,需要10个线程同时统计磁盘空间大小,统计完成后由主线程进行汇总
 */
public class Appliction {
    private final static int NUM = 10;

    public static void main(String[] args) throws InterruptedException {
           CountDownLatch countDownLatch = new CountDownLatch(NUM);
        List<Disk> tasks = new ArrayList<>(NUM);
        for (int i = 0; i < NUM; i++) {
            tasks.add(new Disk(i));
        }
        for (Disk dk : tasks) {
            new Thread(new DiskCountTask(countDownLatch, dk)).start();
        }
        countDownLatch.await();
        int size = tasks.stream().mapToInt(Disk::getSize).sum();
        System.err.println("All disk space size:" + size);
    }
}


class Disk {
    private Integer size;

    public Disk(Integer size) {
        this.size = size;
    }

    public Integer getSize() {
        return size;
    }

    public void setSize(Integer size) {
        this.size = size;
    }
}
class DiskCountTask implements Runnable {
    private Disk disk;
    private CountDownLatch downLatch;

    public DiskCountTask(CountDownLatch downLatch, Disk disk) {
        this.downLatch = downLatch;
        this.disk = disk;
    }

    @Override
    public void run() {
        int size = disk.getSize();
        try {
            TimeUnit.SECONDS.sleep(size);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.err.println(Thread.currentThread().getName() + " exec end[" + System.currentTimeMillis() + "], size:" + size);
        downLatch.countDown();
        System.out.println("count:"+downLatch.getCount());
    }
}

FutureTask

FutureTask也可以用作闭锁;

FutureTask实现了Future语义,表示一种抽象的可生成结果的计算;Future表示的计算是通过Callable来实现,相当于一种可生成结果的Runnable,并且可处于以下三种状态:等待运行,正在运行(Running)和运行完成(Completed);
FutureTask.get的行为取决于任务的状态;如果任务已经完成,那么get会立即返回结果,否则将阻塞直到任务进入完成状态,然后返回结果或抛出异常;
FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布;

示例:

public class Appliction {
    private final static int NUM = 10;

    public static void main(String[] args) throws InterruptedException {
                  List<FutureTask<Integer>> taskList = new ArrayList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(NUM);
        for (int i = 0; i < NUM; i++) {
            FutureTask<Integer> task = new FutureTask<>(new DiskFutureTask(new Disk(i)));
            taskList.add(task);
            executorService.execute(task);
        }
        executorService.shutdown();
        int result = 0;
        for (FutureTask<Integer> task : taskList) {
            Integer size = task.get();
            System.out.println("get size:"+size);
            result += size;
        }
        System.err.println("All disk space size:" + result);
    }
}


class Disk {
    private Integer size;

    public Disk(Integer size) {
        this.size = size;
    }

    public Integer getSize() {
        return size;
    }

    public void setSize(Integer size) {
        this.size = size;
    }
}
class DiskFutureTask implements Callable<Integer> {

    private Disk disk;

    public DiskFutureTask(Disk disk) {
        this.disk = disk;
    }

    @Override
    public Integer call(){
        Integer size = disk.getSize();
        try {
            TimeUnit.SECONDS.sleep(disk.getSize());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.err.println(Thread.currentThread().getName() + " exec end[" + System.currentTimeMillis() + "], size:" + size);
        return size;
    }
}

二、信号量

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量;

Semaphore

Semaphore管理着一组虚拟许可(permit),许可的初始数量可通过构造函数来指定;在执行操作时先获取许可(只要还有剩余的许可),使用完成后释放许可;如果没有许可那么acquire将阻塞直到有许可(或者被中断或操作超时);release方法将返回一个许可给信号量;

初始值为1的Semaphore称为二值信号量;二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义;谁拥有这个唯一的许可谁就拥有了互斥锁;

使用场景:

Semaphore是一件可以容纳N人的房间,如果人不满就可以进去,如果人满了,就要等待有人出来

/**
 * 假设公司体检,房间里一共有3位体检医师,所以一次可以进入3个人,有人出来就有人可以进去
 */
public class Appliction {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 100; i++) {
            new Thread(new PhysicalExaminationTask(semaphore)).start();
        }
    }
}

class PhysicalExaminationTask implements Runnable{
    private Semaphore semaphore;

    public PhysicalExaminationTask(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int time = new Random().nextInt(5);
        if (time > 0) {
            try {
                TimeUnit.SECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.err.println(Thread.currentThread().getName() + " finished");
        semaphore.release();
    }
}

三、栅栏(Barrier)

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生;

栅栏与闭锁的关键区别:所有线程必须同时达到栅栏位置,才能继续执行;闭锁用于等待事件,而栅栏用于等待其它线程;

CyclicBarrier(循环栅栏)

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集

CyclicBarrier初始化的时候,设置一个屏障数。线程调用await()方法的时候,这个线程就会被阻塞,当调用await()的线程数量到达屏障数的时候,主线程就会取消所有被阻塞线程的状态

示例

/**
 * 三个人一起爬山,达到第一集合点后必须等到所有人员到齐后再开始往第二集合点进发
 */
public class Appliction {
    private static final int NUM = 3;

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM);
        String phaseName = "phase one";
        for (int i = 0; i < NUM; i++) {
            new Thread(new PhaseTask(phaseName, cyclicBarrier)).start();
        }
    }
}

class PhaseTask implements Runnable {
    private String phaseName;
    private CyclicBarrier cyclicBarrier;

    public PhaseTask(String phaseName, CyclicBarrier cyclicBarrier) {
        this.phaseName = phaseName;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        sleep();
        System.err.println(Thread.currentThread().getName() + "到达第一个集合点");
        waitFor();

        sleep();
        System.err.println(Thread.currentThread().getName() + "到达第二个集合点");
        waitFor();
    }

    private void sleep() {
        int time = new Random().nextInt(10);
        if (time > 0) {
            try {
                TimeUnit.SECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void waitFor() {
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
文章来源:https://blog.csdn.net/weixin_44345114/article/details/135392341
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。