LinkedBlockingQueue

发布时间:2024年01月15日

LinkedBlockingQueue

1.1 LinkedBlockingQueue的底层实现

查看LinkedBlockingQueue是如何存储数据,并且实现链表结构的。

// Node对象就是存储数据的单位
static class Node<E> {
    // 存储的数据
    E item;
	// 指向下一个数据的指针
    Node<E> next;
	// 有参构造
    Node(E x) { item = x; }
}

查看LinkedBlockingQueue的有参构造

// 可以手动指定LinkedBlockingQueue的长度,如果没有指定,默认为Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 在初始化时,构建一个item为null的节点,作为head和last
	 // 这种node可以成为哨兵Node,
    // 如果没有哨兵节点,那么在获取数据时,需要判断head是否为null,才能找next
    // 如果没有哨兵节点,那么在添加数据时,需要判断last是否为null,才能找next
    last = head = new Node<E>(null);
}

查看LinkedBlockingQueue的其他属性

// 因为是链表,没有想数组的length属性,基于AtomicInteger来记录长度
private final AtomicInteger count = new AtomicInteger();
// 链表的头,取
transient Node<E> head;
// 链表的尾,存
private transient Node<E> last;
// 消费者的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 消费者的挂起操作,以及唤醒用的condition
private final Condition notEmpty = takeLock.newCondition();
// 生产者的锁
private final ReentrantLock putLock = new ReentrantLock();
// 生产者的挂起操作,以及唤醒用的condition
private final Condition notFull = putLock.newCondition();

1.2 生产者方法实现原理

1.2.1 add方法

你懂得,还是走offer方法

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
1.2.2 offer方法
public boolean offer(E e) {
    // 非空校验
    if (e == null) throw new NullPointerException();
    // 拿到存储数据条数的count
    final AtomicInteger count = this.count;
    // 查看当前数据条数,是否等于队列限制长度,达到了这个长度,直接返回false
    if (count.get() == capacity)
        return false;
    // 声明c,作为标记存在
    int c = -1;
    // 将存储的数据封装为Node对象
    Node<E> node = new Node<E>(e);
    // 获取生产者的锁。
    final ReentrantLock putLock = this.putLock;
    // 竞争锁资源
    putLock.lock();
    try {
        // 再次做一个判断,查看是否还有空间
        if (count.get() < capacity) {
            // enqueue,扔数据
            enqueue(node);
            // 将数据个数 + 1
            c = count.getAndIncrement();
            // 拿到count的值 小于 长度限制
            // 有生产者在基于await挂起,这里添加完数据后,发现还有空间可以存储数据,
            // 唤醒前面可能已经挂起的生产者
            // 因为这里生产者和消费者不是互斥的,写操作进行的同时,可能也有消费者在消费数据。
            if (c + 1 < capacity)
                // 唤醒生产者
                notFull.signal();
        }
    } finally {
        // 释放锁资源
        putLock.unlock();
    }
    // 如果c == 0,代表添加数据之前,队列元素个数是0个。
    // 如果有消费者在队列没有数据的时候,来消费,此时消费者一定会挂起线程
    if (c == 0)
        // 唤醒消费者
        signalNotEmpty();
    // 添加成功返回true,失败返回-1
    return c >= 0;
}

//================================================
private void enqueue(Node<E> node) {
    // 将当前Node设置为last的next,并且再将当前Node作为last
    last = last.next = node;
}
//================================================
private void signalNotEmpty() {
    // 获取读锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 唤醒。
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
sync -> wait / notify
1.2.3 offer(time,unit)方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	 // 非空检验
    if (e == null) throw new NullPointerException();
    // 将时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    // 标记
    int c = -1;
    // 写锁,数据条数
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 允许中断的加锁方式
    putLock.lockInterruptibly();
    try {
        // 如果元素个数和限制个数一致,直接准备挂起
        while (count.get() == capacity) {
            // 挂起的时间是不是已经没了
            if (nanos <= 0)
                // 添加失败,返回false
                return false;
            // 挂起线程
            nanos = notFull.awaitNanos(nanos);
        }
        // 有空余位置,enqueue添加数据
        enqueue(new Node<E>(e));
        // 元素个数 + 1
        c = count.getAndIncrement();
        // 当前添加完数据,还有位置可以添加数据,唤醒可能阻塞的生产者
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // 如果之前元素个数是0,唤醒可能等待的消费者
    if (c == 0)
        signalNotEmpty();
    return true;
}
1.2.4 put方法
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 一直挂起线程,等待被唤醒
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

1.3 消费者方法实现原理

从remove方法开始,查看消费者获取数据的方式

1.3.1 remove方法
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
1.3.2 poll方法
public E poll() {
    // 拿到队列数据个数的计数器
    final AtomicInteger count = this.count;
    // 当前队列中数据是否0
    if (count.get() == 0)
        // 说明队列没数据,直接返回null即可
        return null;
    // 声明返回结果
    E x = null;
    // 标记
    int c = -1;
    // 获取消费者的takeLock
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lock();
    try {
        // 基于DCL,确保当前队列中依然有元素
        if (count.get() > 0) {
            // 从队列中移除数据
            x = dequeue();
            // 将之前的元素个数获取,并--
            c = count.getAndDecrement();
            if (c > 1)
                // 如果依然有数据,继续唤醒await的消费者。
                notEmpty.signal();
        }
    } finally {
        // 释放锁资源
        takeLock.unlock();
    }
    // 如果之前的元素个数为当前队列的限制长度,
    // 现在消费者消费了一个数据,多了一个空位可以添加
    if (c == capacity)
        // 唤醒阻塞的生产者
        signalNotFull();
    return x;
}

//================================================

private E dequeue() {
    // 拿到队列的head位置数据
    Node<E> h = head;
    // 拿到了head的next,因为这个是哨兵Node,需要拿到的head.next的数据
    Node<E> first = h.next;
    // 将之前的哨兵Node.next置位null。help GC。
    h.next = h; 
    // 将first置位新的head
    head = first;
    // 拿到返回结果first节点的item数据,也就是之前head.next.item
    E x = first.item;
    // 将first数据置位null,作为新的head
    first.item = null;
    // 返回数据
    return x;
}

//================================================

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 唤醒生产者。
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
1.3.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 返回结果
    E x = null;
    // 标识
    int c = -1;
    // 将挂起实现设置为纳秒级别
    long nanos = unit.toNanos(timeout);
    // 拿到计数器
    final AtomicInteger count = this.count;
    // take锁加锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 如果没数据,进到while
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            // 挂起当前线程
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 剩下内容,和之前一样。
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
} 
1.3.4 take方法
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 相比poll(time,unit)方法,这里的出口只有一个,就是中断标记位,抛出异常,否则一直等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
文章来源:https://blog.csdn.net/m0_63694520/article/details/135605581
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。