ArrayBlockingQueue在初始化的时候,必须指定当前队列的长度。
因为ArrayBlockingQueue是基于数组实现的队列结构,数组长度不可变,必须提前设置数组长度信息。
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
// 必须设置队列的长度
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 生产者扔数据
queue.add("1");
queue.offer("2");
queue.offer("3",2,TimeUnit.SECONDS);
queue.put("2");
// 消费者取数据
System.out.println(queue.remove());
System.out.println(queue.poll());
System.out.println(queue.poll(2,TimeUnit.SECONDS));
System.out.println(queue.take());
}
生产者添加数据到队列的方法比较多,需要一个一个查看
ArrayBlockingQueue中的成员变量
lock = 就是一个ReentrantLock
count = 就是当前数组中元素的个数
iterms = 就是数组本身
# 基于putIndex和takeIndex将数组结构实现为了队列结构
putIndex = 存储数据时的下标
takeIndex = 去数据时的下标
notEmpty = 消费者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify)
notFull = 生产者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify)
add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常
public boolean add(E e) {
if (offer(e))
return true;
else
// 抛出的异常
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
// 要求存储的数据不允许为null,为null就抛出空指针
checkNotNull(e);
// 当前阻塞队列的lock锁
final ReentrantLock lock = this.lock;
// 为了保证线程安全,加锁
lock.lock();
try {
// 如果队列中的元素已经存满了,
if (count == items.length)
// 返回false
return false;
else {
// 队列没满,执行enqueue将元素添加到队列中
enqueue(e);
// 返回true
return true;
}
} finally {
// 操作完释放锁
lock.unlock();
}
}
//==========================================================
private void enqueue(E x) {
// 拿到数组的引用
final Object[] items = this.items;
// 将元素放到指定位置
items[putIndex] = x;
// 对inputIndex进行++操作,并且判断是否已经等于数组长度,需要归位
if (++putIndex == items.length)
// 将索引设置为0
putIndex = 0;
// 元素添加成功,进行++操作。
count++;
// 将一个Condition中阻塞的线程唤醒。
notEmpty.signal();
}
生产者在添加数据时,如果队列已经满了,阻塞一会。
// 如果线程在挂起的时候,如果对当前阻塞线程的中断标记位进行设置,此时会抛出异常直接结束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 非空检验
checkNotNull(e);
// 将时间单位转换为纳秒
long nanos = unit.toNanos(timeout);
// 加锁
final ReentrantLock lock = this.lock;
// 允许线程中断并排除异常的加锁方式
lock.lockInterruptibly();
try {
// 为什么是while(虚假唤醒)
// 如果元素个数和数组长度一致,队列慢了
while (count == items.length) {
// 判断等待的时间是否还充裕
if (nanos <= 0)
// 不充裕,直接添加失败
return false;
// 挂起等待,会同时释放锁资源(对标sync的wait方法)
// awaitNanos会挂起线程,并且返回剩余的阻塞时间
// 恢复执行时,需要重新获取锁资源
nanos = notFull.awaitNanos(nanos);
}
// 说明队列有空间了,enqueue将数据扔到阻塞队列中
enqueue(e);
return true;
} finally {
// 释放锁资源
lock.unlock();
}
}
如果队列是满的, 就一直挂起,直到被唤醒,或者被中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// await方法一直阻塞,直到被唤醒或者中断标记位
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
// remove方法就是调用了poll
public E remove() {
E x = poll();
// 如果有数据,直接返回
if (x != null)
return x;
// 没数据抛出异常
else
throw new NoSuchElementException();
}
// 拉取数据
public E poll() {
// 加锁操作
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果没有数据,直接返回null,如果有数据,执行dequeue,取出数据并返回
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//==========================================================
// 取出数据
private E dequeue() {
// 将成员变量引用到局部变量
final Object[] items = this.items;
// 直接获取指定索引位置的数据
E x = (E) items[takeIndex];
// 将数组上指定索引位置设置为null
items[takeIndex] = null;
// 设置下次取数据时的索引位置
if (++takeIndex == items.length)
takeIndex = 0;
// 对count进行--操作
count--;
// 迭代器内容,先跳过
if (itrs != null)
itrs.elementDequeued();
// signal方法,会唤醒当前Condition中排队的一个Node。
// signalAll方法,会将Condition中所有的Node,全都唤醒
notFull.signal();
// 返回数据。
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 转换时间单位
long nanos = unit.toNanos(timeout);
// 竞争锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果没有数据
while (count == 0) {
if (nanos <= 0)
// 没数据,也无法阻塞了,返回null
return null;
// 没数据,挂起消费者线程
nanos = notEmpty.awaitNanos(nanos);
}
// 取数据
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 虚假唤醒
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
阻塞队列中,如果需要线程挂起操作,判断有无数据的位置采用的是while循环 ,为什么不能换成if
肯定是不能换成if逻辑判断
线程A,线程B,线程E,线程C。 其中ABE生产者,C属于消费者
假如线程的队列是满的
// E,拿到锁资源,还没有走while判断
while (count == items.length)
// A醒了
// B挂起
notFull.await();
enqueue(e);
C此时消费一条数据,执行notFull.signal()唤醒一个线程,A线程被唤醒
E走判断,发现有空余位置,可以添加数据到队列,E添加数据,走enqueue
如果判断是if,A在E释放锁资源后,拿到锁资源,直接走enqueue方法。
此时A线程就是在putIndex的位置,覆盖掉之前的数据,造成数据安全问题