查看LinkedBlockingQueue是如何存储数据,并且实现链表结构的。
// Node对象就是存储数据的单位
static class Node<E> {
// 存储的数据
E item;
// 指向下一个数据的指针
Node<E> next;
// 有参构造
Node(E x) { item = x; }
}
查看LinkedBlockingQueue的有参构造
// 可以手动指定LinkedBlockingQueue的长度,如果没有指定,默认为Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 在初始化时,构建一个item为null的节点,作为head和last
// 这种node可以成为哨兵Node,
// 如果没有哨兵节点,那么在获取数据时,需要判断head是否为null,才能找next
// 如果没有哨兵节点,那么在添加数据时,需要判断last是否为null,才能找next
last = head = new Node<E>(null);
}
查看LinkedBlockingQueue的其他属性
// 因为是链表,没有想数组的length属性,基于AtomicInteger来记录长度
private final AtomicInteger count = new AtomicInteger();
// 链表的头,取
transient Node<E> head;
// 链表的尾,存
private transient Node<E> last;
// 消费者的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 消费者的挂起操作,以及唤醒用的condition
private final Condition notEmpty = takeLock.newCondition();
// 生产者的锁
private final ReentrantLock putLock = new ReentrantLock();
// 生产者的挂起操作,以及唤醒用的condition
private final Condition notFull = putLock.newCondition();
你懂得,还是走offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
// 非空校验
if (e == null) throw new NullPointerException();
// 拿到存储数据条数的count
final AtomicInteger count = this.count;
// 查看当前数据条数,是否等于队列限制长度,达到了这个长度,直接返回false
if (count.get() == capacity)
return false;
// 声明c,作为标记存在
int c = -1;
// 将存储的数据封装为Node对象
Node<E> node = new Node<E>(e);
// 获取生产者的锁。
final ReentrantLock putLock = this.putLock;
// 竞争锁资源
putLock.lock();
try {
// 再次做一个判断,查看是否还有空间
if (count.get() < capacity) {
// enqueue,扔数据
enqueue(node);
// 将数据个数 + 1
c = count.getAndIncrement();
// 拿到count的值 小于 长度限制
// 有生产者在基于await挂起,这里添加完数据后,发现还有空间可以存储数据,
// 唤醒前面可能已经挂起的生产者
// 因为这里生产者和消费者不是互斥的,写操作进行的同时,可能也有消费者在消费数据。
if (c + 1 < capacity)
// 唤醒生产者
notFull.signal();
}
} finally {
// 释放锁资源
putLock.unlock();
}
// 如果c == 0,代表添加数据之前,队列元素个数是0个。
// 如果有消费者在队列没有数据的时候,来消费,此时消费者一定会挂起线程
if (c == 0)
// 唤醒消费者
signalNotEmpty();
// 添加成功返回true,失败返回-1
return c >= 0;
}
//================================================
private void enqueue(Node<E> node) {
// 将当前Node设置为last的next,并且再将当前Node作为last
last = last.next = node;
}
//================================================
private void signalNotEmpty() {
// 获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 唤醒。
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
sync -> wait / notify
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 非空检验
if (e == null) throw new NullPointerException();
// 将时间转换为纳秒
long nanos = unit.toNanos(timeout);
// 标记
int c = -1;
// 写锁,数据条数
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 允许中断的加锁方式
putLock.lockInterruptibly();
try {
// 如果元素个数和限制个数一致,直接准备挂起
while (count.get() == capacity) {
// 挂起的时间是不是已经没了
if (nanos <= 0)
// 添加失败,返回false
return false;
// 挂起线程
nanos = notFull.awaitNanos(nanos);
}
// 有空余位置,enqueue添加数据
enqueue(new Node<E>(e));
// 元素个数 + 1
c = count.getAndIncrement();
// 当前添加完数据,还有位置可以添加数据,唤醒可能阻塞的生产者
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 如果之前元素个数是0,唤醒可能等待的消费者
if (c == 0)
signalNotEmpty();
return true;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 一直挂起线程,等待被唤醒
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
从remove方法开始,查看消费者获取数据的方式
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
// 拿到队列数据个数的计数器
final AtomicInteger count = this.count;
// 当前队列中数据是否0
if (count.get() == 0)
// 说明队列没数据,直接返回null即可
return null;
// 声明返回结果
E x = null;
// 标记
int c = -1;
// 获取消费者的takeLock
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lock();
try {
// 基于DCL,确保当前队列中依然有元素
if (count.get() > 0) {
// 从队列中移除数据
x = dequeue();
// 将之前的元素个数获取,并--
c = count.getAndDecrement();
if (c > 1)
// 如果依然有数据,继续唤醒await的消费者。
notEmpty.signal();
}
} finally {
// 释放锁资源
takeLock.unlock();
}
// 如果之前的元素个数为当前队列的限制长度,
// 现在消费者消费了一个数据,多了一个空位可以添加
if (c == capacity)
// 唤醒阻塞的生产者
signalNotFull();
return x;
}
//================================================
private E dequeue() {
// 拿到队列的head位置数据
Node<E> h = head;
// 拿到了head的next,因为这个是哨兵Node,需要拿到的head.next的数据
Node<E> first = h.next;
// 将之前的哨兵Node.next置位null。help GC。
h.next = h;
// 将first置位新的head
head = first;
// 拿到返回结果first节点的item数据,也就是之前head.next.item
E x = first.item;
// 将first数据置位null,作为新的head
first.item = null;
// 返回数据
return x;
}
//================================================
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 唤醒生产者。
notFull.signal();
} finally {
putLock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 返回结果
E x = null;
// 标识
int c = -1;
// 将挂起实现设置为纳秒级别
long nanos = unit.toNanos(timeout);
// 拿到计数器
final AtomicInteger count = this.count;
// take锁加锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果没数据,进到while
while (count.get() == 0) {
if (nanos <= 0)
return null;
// 挂起当前线程
nanos = notEmpty.awaitNanos(nanos);
}
// 剩下内容,和之前一样。
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 相比poll(time,unit)方法,这里的出口只有一个,就是中断标记位,抛出异常,否则一直等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}