并发编程之阻塞队列

发布时间:2024年01月15日

目录

什么是队列?

?Queue接口

阻塞队列

应用场景

ArrayBlockingQueue

ArrayBlockingQueue使用

ArrayBlockingQueue的原理

数据结构

入队put方法

出队take方法

LinkedBlockingQueue

LinkedBlockingQueue使用

LinkedBlockingQueue原理

数据结构

入队put方法

出队take方法

LinkedBlockingQueue与ArrayBlockingQueue对比

DelayQueue

DelayQueue的使用

DelayQueue原理

数据结构

入队put方法

出队take方法


什么是队列?

? ? ? ?Java中,队列(Queue)是一种常见的数据结构,它按照先进先出(FIFO)的原则管理元素。限定在一端进行插入,另一端进行删除的特殊线性表,允许出队的一端称为队头,允许入队的一端称为队尾。队列通常用于在数据集合中添加和删除元素的场景。在Java中,Queue接口扩展了java.util.Collection接口,提供了用于处理队列的方法。

? ? ? ?

?Queue接口

public interface Queue<E> extends Collection<E> {
 //添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
 boolean add(E e);
 //添加一个元素,添加成功返回true, 如果队列满了,返回false
 boolean offer(E e);
 //返回并删除队首元素,队列为空则抛出异常
 E remove();
 //返回并删除队首元素,队列为空则返回null
 E poll();
 //返回队首元素,但不移除,队列为空则抛出异常
 E element();
 //获取队首元素,但不移除,队列为空则返回null
 E peek();
 }

阻塞队列

? ? ? ?阻塞队列(Blocking Queue)是一种特殊类型的队列,它具有一些额外的特性,使得在队列的操作上能够实现线程之间的协调和同步。阻塞队列的主要特点是当队列为空时,从队列中取元素的操作将会被阻塞,直到队列中有元素。当队列已满时,往队列中放入元素的操作将会被阻塞,直到队列中有空闲位置。这种特性使得阻塞队列很适合在多线程编程中用于线程之间的数据传递和协调。

? ? ? ?Java中,java.util.concurrent?包提供了BlockingQueue接口,该接口定义了阻塞队列的基本操作。常见的实现类有:

1. ArrayBlockingQueue:?一个由数组支持的有界阻塞队列。

2. LinkedBlockingQueue:?一个由链表支持的可选有界阻塞队列。

3. PriorityBlockingQueue:?一个无界阻塞队列,其中元素被排序按优先级。

4. DelayQueue:?一个无界阻塞队列,只有在延迟期满时才能取出元素。

5. SynchronousQueue:?一个没有存储元素的阻塞队列,用于直接传递数据。

6. LinkedTransferQueue:?一个由链表支持的无界阻塞队列,其中元素按照 FIFO(先进先出)排序。


应用场景

1.?线程池

? ? ? ?线程池中的任务队列通常是一个阻塞队列。当任务数超过线程池的容量时,新提交的任务将被放入任 务队列中等待执行。线程池中的工作线程从任务队列中取出任务进行处理,如果队列为空,则工作线 程会被阻塞,直到队列中有新的任务被提交。

2. 生产者-消费者模型

? ? ? ?在生产者-消费者模型中,生产者向队列中添加元素,消费者从队列中取出元素进行处理。阻塞队列可 以很好地解决生产者和消费者之间的并发问题,避免线程间的竞争和冲突。

3. 消息队列

? ? ? ?消息队列使用阻塞队列来存储消息,生产者将消息放入队列中,消费者从队列中取出消息进行处理。 消息队列可以实现异步通信,提高系统的吞吐量和响应性能,同时还可以将不同的组件解耦,提高系 统的可维护性和可扩展性。

4. 缓存系统

? ? ? ?缓存系统使用阻塞队列来存储缓存数据,当缓存数据被更新时,它会被放入队列中,其他线程可以从 队列中取出最新的数据进行使用。使用阻塞队列可以避免并发更新缓存数据时的竞争和冲突。

5. 并发任务处理

? ? ? ?在并发任务处理中,可以将待处理的任务放入阻塞队列中,多个工作线程可以从队列中取出任务进行 处理。使用阻塞队列可以避免多个线程同时处理同一个任务的问题,并且可以将任务的提交和执行解 耦,提高系统的可维护性和可扩展性。


ArrayBlockingQueue

? ? ? ? ArrayBlockingQueue是Java中的一个具体实现了BlockingQueue接口的类,它使用数组作为其内部数据结构来存储元素。利用 ReentrantLock 实现线程安全。ArrayBlockingQueue可以用于实现数据缓存、限流、生产者-消费者模式等各种应用。与其他BlockingQueue实现不同,ArrayBlockingQueue 具有固定的容量,即在创建队列时就需要指定最大容量,之后不可更改。

? ? ? ?在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用 ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生 产线程被阻塞。

ArrayBlockingQueue使用

 BlockingQueue queue = new ArrayBlockingQueue(1024);
 queue.put("1"); //向队列中添加元素
 Object object = queue.take(); //从队列中取出元素

ArrayBlockingQueue的原理

? ? ? ? ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象, 也就是只能有一个线程可以进行入队或者出队操作。这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

数据结构

利用了Lock锁的Condition通知机制进行阻塞控制。核心:一把锁,两个条件。

 //数据元素数组
 final Object[] items;
 //下一个待取出元素索引
 int takeIndex;
 //下一个待添加元素索引
 int putIndex;
 //元素个数
 int count;
 //内部锁
 final ReentrantLock lock;
 //消费者
 private final Condition notEmpty;
 //生产者
 private final Condition notFull;

 public ArrayBlockingQueue(int capacity) {
 this(capacity, false);
 }
 public ArrayBlockingQueue(int capacity, boolean fair) {
 //...
 lock = new ReentrantLock(fair); //公平,非公平
 notEmpty = lock.newCondition();
 notFull = lock.newCondition();
 }
入队put方法
        public void put(E e) throws InterruptedException {
            //检查是否为空
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            //加锁,如果线程中断抛出异常
            lock.lockInterruptibly();
            try {
                //阻塞队列已满,则将生产者挂起,等待消费者唤醒
                //设计注意点: 用while不用if是为了防止虚假唤醒
                while (count == items.length)
                    notFull.await(); //队列满了,使用notFull等待(生产者阻塞)
                // 入队
                enqueue(e);
            } finally {
                lock.unlock(); // 唤醒消费者线程
            }
        }

        private void enqueue(E x) {
            final Object[] items = this.items;
            //入队 使用的putIndex
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0; //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部
            count++;
            //notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了
            notEmpty.signal();
        }
出队take方法
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            //加锁,如果线程中断抛出异常
            lock.lockInterruptibly();
            try {
                //如果队列为空,则消费者挂起
                while (count == 0)
                    notEmpty.await();
                //出队
                return dequeue();
            } finally {
                lock.unlock();// 唤醒生产者线程
            }
        }
        private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex]; //取出takeIndex位置的元素
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            //notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
            notFull.signal();
            return x;
        }

? ? ? ?通过出队和入队我们可以看到,在对数组的操作中使用了双指针takeIndex和putIndex,使用双指针的好处在于可以避免数组的复制操作。如果使用单指针,每次删除元素时需要将后面的元素全部向前移动,这样会导致时间复杂度为O(n)。而使用双指针,我们可以直接将takeIndex指向下一个元素,而不需要将其前面的元素全部向前移动。同样地,插入新的元素时,我们可以直接将新元 素插入到putIndex所指向的位置,而不需要将其后面的元素全部向后移动。这样可以使得插入和删除的时间复杂度都是O(1)级别,提高了队列的性能。


LinkedBlockingQueue

? ? ? ?LinkedBlockingQueue是BlockingQueue接口的一个具体实现,它是由链表支持的阻塞队列。这意味着它内部使用链表数据结构来存储元素,并且具有阻塞的特性,可以用于多线程环境中的线程协调和同步。默认情况下,该阻塞队列的大小为 Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表 它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,使用的时候建议传一个队列的大小值。

LinkedBlockingQueue使用

 //指定队列的大小创建有界队列
 BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
 //无界队列
 BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();

LinkedBlockingQueue原理

? ? ? ?LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。 LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

数据结构
        // 容量,指定容量就是有界队列
        private final int capacity;
        // 元素数量
        private final AtomicInteger count = new AtomicInteger();
        // 链表头 本身是不存储任何元素的,初始化时item指向null
        transient Node<E> head;
        // 链表尾
        private transient Node<E> last;
        // take锁 锁分离,提高效率
        private final ReentrantLock takeLock = new ReentrantLock();
        // notEmpty条件
        // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
        private final Condition notEmpty = takeLock.newCondition();
        // put锁
        private final ReentrantLock putLock = new ReentrantLock();
        // notFull条件
        // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
        private final Condition notFull = putLock.newCondition();

        //典型的单链表结构
        static class Node<E> {
            E item; //存储元素
            Node<E> next; //后继节点 单链表结构
            Node(E x) { item = x; }
        }

构造器

        public LinkedBlockingQueue() {
            // 如果没传容量,就使用最大int值初始化其容量
            this(Integer.MAX_VALUE);
        }

        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            // 初始化head和last指针为空值节点
            last = head = new Node<E>(null);
        }
入队put方法
    public void put(E e) throws InterruptedException {
        // 不允许null元素
        if (e == null) throw new NullPointerException();
        int c = -1;
        // 新建一个节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 使用put锁加锁
        putLock.lockInterruptibly();
        try {
            // 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)
            while (count.get() == capacity) {
                notFull.await();
            }
            // 队列不满,就入队
            enqueue(node);
            c = count.getAndIncrement();// 队列长度加1,返回原值
            // 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)

            // 这里为啥要唤醒一下呢?
            // 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒

            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock(); // 真正唤醒生产者线程
        }
        // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
        if (c == 0)
            signalNotEmpty();
    }
    private void enqueue(Node<E> node) {
        // 直接加到last后面,last指向入队元素
        last = last.next = node;
    }
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();// 加take锁
        try {
            notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
        } finally {
            takeLock.unlock(); // 真正唤醒消费者线程
        }
    }
出队take方法
  public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 使用takeLock加锁
        takeLock.lockInterruptibly();
        try {
            // 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 否则,出队
            x = dequeue();
            c = count.getAndDecrement();//长度-1,返回原值
            if (c > 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理

                notEmpty.signal();
        } finally {
            takeLock.unlock(); // 真正唤醒消费者线程
        }
        // 为什么队列是满的才唤醒阻塞在notFull上的线程呢?
        // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,

        // 这也是锁分离带来的代价
        // 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
    private E dequeue() {
        // head节点本身是不存储任何元素的
        // 这里把head删除,并把head下一个节点作为新的值
        // 并把其值置空,返回原来的值
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // 方便GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
        } finally {
            putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程
        }
    }

LinkedBlockingQueue与ArrayBlockingQueue对比

1. 底层数据结构:
???????LinkedBlockingQueue 使用链表作为其底层数据结构。因此,它适用于需要高吞吐量、但队列大小可能变化较大的情况。
???????ArrayBlockingQueue 使用数组作为其底层数据结构。数组大小在创建时固定,因此它适用于固定大小的队列。
2. 容量限制:
???????LinkedBlockingQueue 可以是有界的,也可以是无界的。如果创建时指定了容量,它将限制队列的大小。
???????ArrayBlockingQueue 总是有界的,其容量在创建时就被固定,无法更改。
3. 内存占用:
???????由于 LinkedBlockingQueue 使用链表,它的大小可以根据实际存储的元素数量动态变化,因此它的内存占用可能更灵活。
????????ArrayBlockingQueue 的大小在创建时固定,因此它可能在一开始就分配了一定大小的内存,即使队列中的元素较少。
4. 公平性:
????????LinkedBlockingQueue 支持可选的公平性设置。在公平模式下,多个线程争夺队列中的元素时,它们将按照到达的顺序进行处理。
????????ArrayBlockingQueue 没有提供内置的公平性设置。
5. 性能:
????????LinkedBlockingQueue 在高并发、多线程环境下可能具有更好的性能,尤其是在元素的插入和移除操作频繁发生的情况下。
????????ArrayBlockingQueue 的性能可能在高并发时略逊于 LinkedBlockingQueue,但在一些情况下,由于数组的局部性,它也可能表现得很好。


DelayQueue

? ? ? ?DelayQueue 是 Java 中的一个特殊类型的阻塞队列,它实现了 BlockingQueue 接口。与普通的队列不同,DelayQueue 中的元素只有在一定的延迟时间过去后才能被取出。这个延迟时间由元素自身决定。
? ? ? ?要使用 DelayQueue,元素必须实现 Delayed 接口,该接口定义了一个方法getDelay(TimeUnit unit),用于返回元素距离过期还有多长时间。如果 getDelay 方法返回的时间小于等于零,则表示该元素已经过期,可以从队列中取出。

DelayQueue的使用

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedElement implements Delayed {
    private long delayTime;
    private String data;

    public DelayedElement(long delay, String data) {
        this.delayTime = System.currentTimeMillis() + delay;
        this.data = data;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = delayTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.delayTime < ((DelayedElement) o).delayTime) {
            return -1;
        }
        if (this.delayTime > ((DelayedElement) o).delayTime) {
            return 1;
        }
        return 0;
    }

    public String getData() {
        return data;
    }
}

public class DelayQueueExample {
    public static void main(String[] args) {
        DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();

        delayQueue.put(new DelayedElement(1000, "One Second"));
        delayQueue.put(new DelayedElement(2000, "Two Seconds"));
        delayQueue.put(new DelayedElement(3000, "Three Seconds"));

        try {
            // 阻塞等待元素过期
            System.out.println(delayQueue.take().getData()); // 输出 "One Second"
            System.out.println(delayQueue.take().getData()); // 输出 "Two Seconds"
            System.out.println(delayQueue.take().getData()); // 输出 "Three Seconds"
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

? ? ? ?在以上代码中,DelayedElement 类实现了 Delayed 接口,表示一个具有延迟过期时间的元素。然后,这些元素被放入 DelayQueue 中,通过调用 take 方法,将阻塞等待队列中的元素过期并取出。

DelayQueue原理

数据结构
    //用于保证队列操作的线程安全
    private final transient ReentrantLock lock = new ReentrantLock();
    // 优先级队列,存储元素,用于保证延迟低的优先执行
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    // 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
    private Thread leader = null;
    // 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
    private final Condition available = lock.newCondition();

    public DelayQueue() {}
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
入队put方法
    public void put(E e) {
        offer(e);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 入队
            q.offer(e);
            if (q.peek() == e) {
                // 若入队的元素位于队列头部,说明当前元素延迟最小
                // 将 leader 置空
                leader = null;
                // available条件队列转同步队列,准备唤醒阻塞在available上的线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock(); // 解锁,真正唤醒阻塞的线程
        }
    }
出队take方法
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)
            if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
                available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
            else {
                long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间


                if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
                    return q.poll();

                // 如果delay大于0 ,则下面要阻塞了
                // 将first置为空方便gc
                first = null;
                // 如果有线程争抢的Leader线程,则进行无限期等待。
                if (leader != null)
                    available.await();
                else {
                    // 如果leader为null,把当前线程赋值给它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待剩余等待时间
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally{
    // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
    if (leader == null && q.peek() != null)
        // available条件队列转同步队列,准备唤醒阻塞在available上的线程
        available.signal();
    // 解锁,真正唤醒阻塞的线程
    lock.unlock();
    }
}

? ? ? ? 以上代码解释:当获取元素时,先获取到锁对象。 获取最早过期的元素,但是并不从队列中弹出元素。 最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。 如果最早过期的元素不为空 获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进行无限期等待,如果Leader为空,则首先将Leader设置为当前线程(确保资源后续的正确释放),并且让当前线程等待剩余时间。 最后将Leader线程设置为空。如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

文章来源:https://blog.csdn.net/m0_61853556/article/details/135603386
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。