JUC BlockingQueue(阻塞队列)

发布时间:2024年01月23日

BlockingQueue 阻塞队列

BlockingQueue 解决了多线程中,如何高效安全“传输”数据的问题。我们可以把它当做一个线程安全的容器,有了它,我们可以更简单且安全的进行在多线程下存取“数据”存取操作。

BlockingQueue 支持以下两个重要阻塞操作

  • 获取元素时等待队列变为非空(队列为空是取数据将阻塞等待)
  • 存储元素时等待队列变为可用(队列已满时存储数据将阻塞等待)。

BlockingQueue 的继承关系

在这里插入图片描述

由此我们可以知道,BlockingQueue 也是集合的一种

BlockingQueue 接口方法

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 在不违反队列容量限制的情况下,将指定的元素插入此队列
     * 如果容量已满,则抛出 IllegalStateException 
     *
     * @param e 要添加的元素
     * @return 是否成功添加
     * @throws IllegalStateException 如果当前时间无法成功添加元素(队列已满)
     * @throws ClassCastException 如果指定元素的类与此队列不兼容
     * @throws NullPointerException 操作的元素为null
     * @throws IllegalArgumentException 元素的某些属性阻止元素加入此队列
     */
    boolean add(E e);

    /**
     * 在不违反队列容量限制的情况下,将指定的元素插入此队列
     * 成功插入 true,否则(容量已满时)为 false
     *
     * @param e 要添加的元素
     * @return 成功插入 true,否则为 false
     * @throws ClassCastException 如果指定元素的类与此队列不兼容
     * @throws NullPointerException 操作的元素为null
     * @throws IllegalArgumentException 元素的某些属性阻止元素加入此队列
     */
    boolean offer(E e);

    /**
     * 在不违反队列容量限制的情况下,将指定的元素插入此队列
     * 容量已满时将等待直到有可用容量(阻塞)
     *
     * @param e 要添加的元素
     * @throws InterruptedException 等待被打断时
     * @throws ClassCastException 如果指定元素的类与此队列不兼容
     * @throws NullPointerException 操作的元素为null
     * @throws IllegalArgumentException 元素的某些属性阻止元素加入此队列
     */
    void put(E e) throws InterruptedException;

    /**
     * 在不违反队列容量限制的情况下,将指定的元素插入此队列
     * 容量已满时将等待直到有可用容量或等待时间到
     *
     * @param e 要添加的元素
     * @param timeout 等待超时时间
     * @param unit 等待超时时间单位
     * @return 成功添加返回true,如果队列满了且等待超时返回false
     * @throws InterruptedException 等待被打断时
     * @throws ClassCastException 如果指定元素的类与此队列不兼容
     * @throws NullPointerException 操作的元素为null
     * @throws IllegalArgumentException 元素的某些属性阻止元素加入此队列
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 删除并获得队列的对头(header)元素
     * 当队列为空是将等待(阻塞)
     *
     * @return 对头的元素
     * @throws InterruptedException 等待被打断
     */
    E take() throws InterruptedException;

    /**
     * 删除并获得队列的对头(header)元素
     * 当队列为空是将等待(阻塞),最长阻塞相应的时间
     *
     * @param timeout 等待超时时间
     * @param unit 等待超时时间单位
     * @return 对头的元素,当超过等待时间是还没有元素则返回 null
     * @throws InterruptedException 等待被打断
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 检查队列的剩余容量并返回
     * 但这并不是准确的(因为此时另一个线程可能已经新添加的数据到此队列)
     *
     * @return 队列的剩余容量
     */
    int remainingCapacity();

    /**
     * 删除队列元素(通过 o.equals(e) 方法比对,只要 equals 返回 true 就删除)
     * 如果删除成功返回 true
     * 
     * @param o 要删除的元素
     * @return 如果队列更改成功返回 true
     * @throws ClassCastException 如果指定元素的类与此队列不兼容
     * @throws NullPointerException 操作的元素为null
     */
    boolean remove(Object o);

    /**
     * 通过 o.equals(e) 方法比对,只要 equals 返回 true 就表示队列中包含该元素
     * 如果队列中包含该元素返回 true
     *
     * @param o 要判断的元素
     * @return 如果队列中包含该元素返回 true
     * @throws ClassCastException 如果指定元素的类与此队列不兼容
     * @throws NullPointerException 操作的元素为null
     */
    public boolean contains(Object o);

    /**
     * 删除队列中所有元素,并将它们转移到参数 c 集合中
     *
     * @param c 存储删除的元素集合
     * @return 返回删除成功的元素个数
     * @throws UnsupportedOperationException 当指定的集合不支持相应的元素存放
     * @throws ClassCastException 元素类型和集合类型不兼容
     * @throws NullPointerException 集合为空
     * @throws IllegalArgumentException 集合为队列自身或集合不支持队列中的元素
     */
    int drainTo(Collection<? super E> c);

    /**
     * 删除队列中的元素,并将它们转移到参数 c 集合中
     * 转移的元素个数受 maxElements 参数限制,表示最大删除 maxElements 个元素
     *
     * @param c 接受删除元素的个数
     * @param maxElements 最大删除的元素个数
     * @return 返回删除成功的元素个数
     * @throws UnsupportedOperationException 当指定的集合不支持相应的元素存放
     * @throws ClassCastException 元素类型和集合类型不兼容
     * @throws NullPointerException 集合为空
     * @throws IllegalArgumentException 集合为队列自身或集合不支持队列中的元素
     */
    int drainTo(Collection<? super E> c, int maxElements);
}

除了以上方法外,其还从 Queue 接口继承了两个方法


    /**
     * 检索但不删除队列的头,如果队列为空将抛出异常 NoSuchElementException 
     *
     * @return 对头元素
     * @throws NoSuchElementException 如果队列为空
     */
    E element();

    /**
     * 检索但不删除队列的头,如果此队列为空,则返回null。
     *
     * @return the head of this queue, or {@code null} if this queue is empty
     */
    E peek();

BlockingQueue 这么多方法,其实就两个操作,存和取。这些方法在队列为空或队列满的时候有不同的特性:

拋异常插入操作返回false,获取操作返回null阻塞阻塞,有超时时间
插入操作add(e)offer(e)put(e)offer(e, time, unit)
删除操作remove(e)poll()teak()poll(time, unit)
检查操作element()peek()

ArrayBlockQueue

由于这里的知识太过零散,用文字的方式不好表达,这里总结为思维导图,通过图片分享出来。思维导图请通过对应对应源码来看,会好一些,否则可能无法理解有些内容

在这里插入图片描述

LinkedBlockingQueue

有单向链表实现的阻塞队列

在这里插入图片描述

LinkedBlockingDeque 双向链表阻塞队列

其特点是可以从链表两端插入元素

在这里插入图片描述

SynchronousQueue

在这里插入图片描述

DelayQueue 延时阻塞队列

在这里插入图片描述

由于延时队列的元素最终是放在 PriorityQueue,所以其元素也要遵循 PriorityQueue 队列对元素的限制。(思维导图中 PriorityQueue 对元素的限制为可比较的且不能为 null)

延时队列示例

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

@Slf4j
public class DelayQueueTest {

    public static void main(String[] args) throws InterruptedException {

        DelayQueue<MyElement> q = new DelayQueue<>();

        // take 的时候,会等待 5000 毫秒才能获取成功
        q.put(new MyElement(1,new Date().getTime() + 5000));

        System.out.println(q.take());

    }
}

@Getter
@Setter
class MyElement implements Delayed {

    private int value;

    /**
     * 执行时间
     */
    private long time;

    public MyElement(int value,long time){
        this.value = value;
        this.time = time;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if(this == o){
            return 0;
        }

        MyElement oo = (MyElement) o;
        long x = this.getTime() - oo.getTime();
        if(x < 0){
            return -1;
        }else if(x == 0){
            return 0;
        }else{
            return 1;
        }
    }
}

补充说明:Delayed 的 getDelay 方法,其实就是判断是否到了执行时间,只要 getDelay 返回小于等于 0,则任务出队执行,否则继续阻塞等待。

其他

在这里插入图片描述

PriorityQueue 的底层实现

在这里插入图片描述

如我们的思维导图,PriorityQueue 的底层是由数组实现的完全二叉树,如我们定义一个长度为6的数组,并将数字 6,5,4,3,2,1按规则放入该数组,那么该完全二叉树对应的数组下标和对应的数据存放位置如下:

在这里插入图片描述

如果顺序遍历该数组,打印将依次打印出对应元素:1,3,2,6,4,5。这其实就是我们的入队逻辑:依次放入数据,并保证父节点数据比任意子节点的数据都小

出队逻辑:每次从根节点出队,出队后选取数据更小的子节点数据存入根节点,移动后的节点依次类推,直到形成新的完全二叉树,如我们的示例,出队一次后,形成的新的完全二叉树如下:

在这里插入图片描述

以上入队出队逻辑,对应 PriorityQueue 的 offer 和 poll 方法的逻辑。(这里要注意:出队是出的根元素,之后的元素会按出队规则进行移动,形成新的、符合其规则的完全二叉树)

文章来源:https://blog.csdn.net/forlinkext/article/details/134794124
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。