CountDownLatch应用&源码分析

发布时间:2024年01月20日

一、CountDownLatch应用&源码分析

1.1 CountDownLatch介绍

CountDownLatch就是JUC包下的一个工具,整个工具最核心的功能就是计数器。

如果有三个业务需要并行处理,并且需要知道三个业务全部都处理完毕了。

需要一个并发安全的计数器来操作。

CountDownLatch就可以实现。

给CountDownLatch设置一个数值。可以设置3。

每个业务处理完毕之后,执行一次countDown方法,指定的3每次在执行countDown方法时,对3进行-1。

主线程可以在业务处理时,执行await,主线程会阻塞等待任务处理完毕。

当设置的3基于countDown方法减为0之后,主线程就会被唤醒,继续处理后续业务。

image.png

当咱们的业务中,出现2个以上允许并行处理的任务,并且需要在任务都处理完毕后,再做其他处理时,可以采用CountDownLatch去实现这个功能。

1.2 CountDownLatch应用

模拟有三个任务需要并行处理,在三个任务全部处理完毕后,再执行后续操作

CountDownLatch中,执行countDown方法,代表一个任务结束,对计数器 - 1

执行await方法,代表等待计数器变为0时,再继续执行

执行await(time,unit)方法,代表等待time时长,如果计数器不为0,返回false,如果在等待期间,计数器为0,方法就返回true

一般CountDownLatch更多的是基于业务去构建,不采用成员变量。

static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

static CountDownLatch countDownLatch = new CountDownLatch(3);

public static void main(String[] args) throws InterruptedException {
    System.out.println("主业务开始执行");
    sleep(1000);
    executor.execute(CompanyTest::a);
    executor.execute(CompanyTest::b);
    executor.execute(CompanyTest::c);
    System.out.println("三个任务并行执行,主业务线程等待");
    // 死等任务结束
    // countDownLatch.await();
    // 如果在规定时间内,任务没有结束,返回false
    if (countDownLatch.await(10, TimeUnit.SECONDS)) {
        System.out.println("三个任务处理完毕,主业务线程继续执行");
    }else{
        System.out.println("三个任务没有全部处理完毕,执行其他的操作");
    }
}

private static void a() {
    System.out.println("A任务开始");
    sleep(1000);
    System.out.println("A任务结束");
    countDownLatch.countDown();
}
private static void b() {
    System.out.println("B任务开始");
    sleep(1500);
    System.out.println("B任务结束");
    countDownLatch.countDown();
}
private static void c() {
    System.out.println("C任务开始");
    sleep(2000);
    System.out.println("C任务结束");
    countDownLatch.countDown();
}

private static void sleep(long timeout){
    try {
        Thread.sleep(timeout);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

1.3 CountDownLatch源码分析

保证CountDownLatch就是一个计数器,没有什么特殊的功能,查看源码也只是查看计数器实现的方式

发现CountDownLatch的内部类Sync继承了AQS,CountDownLatch就是基于AQS实现的计数器。

AQS就是一个state属性,以及AQS双向链表

猜测计数器的数值实现就是基于state去玩的。

主线程阻塞的方式,也是阻塞在了AQS双向链表中。

1.3.1 有参构造

就是构建内部类Sync,并且给AQS中的state赋值

// CountDownLatch的有参构造
public CountDownLatch(int count) {
    // 健壮性校验
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 构建内部类,Sync传入count
    this.sync = new Sync(count);
}

// AQS子类,Sync的有参构造
Sync(int count) {
    // 就是给AQS中的state赋值
    setState(count);
}
1.3.2 await方法

await方法就时判断当前CountDownLatch中的state是否为0,如果为0,直接正常执行后续任务

如果不为0,以共享锁的方式,插入到AQS的双向链表,并且挂起线程

// 一般主线程await的方法,阻塞主线程,等待state为0
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 执行了AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 判断线程是否中断,如果中断标记位是true,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 共享锁挂起的操作
        doAcquireSharedInterruptibly(arg);
}

// tryAcquireShared在CountDownLatch中的实现
protected int tryAcquireShared(int acquires) {
    // 查看state是否为0,如果为0,返回1,不为0,返回-1
    return (getState() == 0) ? 1 : -1;
}

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 封装当前先成为Node,属性为共享锁
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 在这,就需要挂起当前线程。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1.3.3 countDown方法

countDown方法本质就是对state - 1,如果state - 1后变为0,需要去AQS的链表中唤醒挂起的节点

// countDown对计数器-1
public void countDown() {
    // 是-1。
    sync.releaseShared(1);
}

// AQS提供的功能
public final boolean releaseShared(int arg) {
    // 对state - 1
    if (tryReleaseShared(arg)) {
        // state - 1后,变为0,执行doReleaseShared
        doReleaseShared();
        return true;
    }
    return false;
}
// CountDownLatch的tryReleaseShared实现
protected boolean tryReleaseShared(int releases) {
    // 死循环是为了避免CAS并发问题
    for (;;) {
        // 获取state
        int c = getState();
        // state已经为0,直接返回false
        if (c == 0)
            return false;
        // 对获取到的state - 1
        int nextc = c-1;
        // 基于CAS的方式,将值赋值给state
        if (compareAndSetState(c, nextc))
            // 赋值完,发现state为0了。此时可能会有线程在await方法处挂起,那边挂起,需要这边唤醒
            return nextc == 0;
    }
}

// 如何唤醒在await方法处挂起的线程
private void doReleaseShared() {
    // 死循环
    for (;;) {
        // 拿到head
        Node h = head;
        // head不为null,有值,并且head != tail,代表至少2个节点
        // 一个虚拟的head,加上一个实质性的Node
        if (h != null && h != tail) {
            // 说明AQS队列中有节点
            int ws = h.waitStatus;
            // 如果head节点的状态为 -1.
            if (ws == Node.SIGNAL) {
                // 先对head节点将状态从-1,修改为0,避免重复唤醒的情况
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;  
                // 正常唤醒节点即可,先看head.next,能唤醒就唤醒,如果head.next有问题,从后往前找有效节点
                unparkSuccessor(h);
            }
            // 会在Semaphore中谈到这个位置
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;  
        }
        // 会在Semaphore中谈到这个位置
        if (h == head)  
            break;
    }
}
文章来源:https://blog.csdn.net/m0_63694520/article/details/135713866
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。