DelayQueue

发布时间:2024年01月16日

DelayQueue

5.1 DelayQueue介绍&应用

DelayQueue就是一个延迟队列,生产者写入一个消息,这个消息还有直接被消费的延迟时间。

需要让消息具有延迟的特性。

DelayQueue也是基于二叉堆结构实现的,甚至本事就是基于PriorityQueue实现的功能。二叉堆结构每次获取的是栈顶的数据,需要让DelayQueue中的数据,在比较时,跟根据延迟时间做比较,剩余时间最短的要放在栈顶。

查看DelayQueue类信息:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    // 发现DelayQueue中的元素,需要继承Delayed接口。
}
// ==========================================
// 接口继承了Comparable,这样就具备了比较的能力。
public interface Delayed extends Comparable<Delayed> {
    // 抽象方法,就是咱们需要设置的延迟时间
    long getDelay(TimeUnit unit);
  
    // Comparable接口提供的:public int compareTo(T o);
}

基于上述特点,声明一个可以写入DelayQueue的元素类

public class Task implements Delayed {

    /** 任务的名称 */
    private String name;

    /** 什么时间点执行 */
    private Long time;

    /**
     *
     * @param name
     * @param delay  单位毫秒。
     */
    public Task(String name, Long delay) {
        // 任务名称
        this.name = name;
        this.time = System.currentTimeMillis() + delay;
    }

    /**
     * 设置任务什么时候可以出延迟队列
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
		// 单位是毫秒,视频里写错了,写成了纳秒,
        return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    /**
     * 两个任务在插入到延迟队列时的比较方式
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.time - ((Task)o).getTime());
    }
}

在使用时,查看到DelayQueue底层用了PriorityQueue,在一定程度上,DelayQueue也是无界队列。

测试效果

public static void main(String[] args) throws InterruptedException {
    // 声明元素
    Task task1 = new Task("A",1000L);
    Task task2 = new Task("B",5000L);
    Task task3 = new Task("C",3000L);
    Task task4 = new Task("D",2000L);
    // 声明阻塞队列
    DelayQueue<Task> queue = new DelayQueue<>();
    // 将元素添加到延迟队列中
    queue.put(task1);
    queue.put(task2);
    queue.put(task3);
    queue.put(task4);
    // 获取元素
    System.out.println(queue.take());
    System.out.println(queue.take());
    System.out.println(queue.take());
    System.out.println(queue.take());
    // A,D,C,B
}

在应用时,外卖,15分钟商家需要节点,如果不节点,这个订单自动取消。

可以每下一个订单,就放到延迟队列中,如果规定时间内,商家没有节点,直接通过消费者获取元素,然后取消订单。

只要是有需要延迟一定时间后,再执行的任务,就可以通过延迟队列去实现。

5.2、DelayQueue核心属性

可以查看到DelayQueue就四个核心属性

// 因为DelayQueue依然属于阻塞队列,需要保证线程安全。看到只有一把锁,生产者和消费者使用的是一个lock
private final transient ReentrantLock lock = new ReentrantLock();
// 因为DelayQueue还是基于二叉堆结构实现的,没有必要重新搞一个二叉堆,直接使用的PriorityQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般会存储等待栈顶数据的消费者,在整体写入和消费的过程中,会设置的leader的一些判断。
private Thread leader = null;
// 生产者在插入数据时,不会阻塞的。当前的Condition就是给消费者用的
// 比如消费者在获取数据时,发现栈顶的数据还又没到延迟时间。
// 这个时候,咱们就需要将消费者线程挂起,阻塞一会,阻塞到元素到了延迟时间,或者是,生产者插入的元素到了栈顶,此时生产者会唤醒消费者。
private final Condition available = lock.newCondition();

5.3、DelayQueue写入流程分析

Delay是无界的,数组可以动态的扩容,不需要关注生产者的阻塞问题,他就没有阻塞问题。

这里只需要查看offer方法即可。

public boolean offer(E e) {
    // 直接获取lock,加锁。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 直接调用PriorityQueue的插入方法,这里会根据之前重写Delayed接口中的compareTo方法做排序,然后调整上移和下移操作。
        q.offer(e);
        // 调用优先级队列的peek方法,拿到堆顶的数据
        // 拿到堆顶数据后,判断是否是刚刚插入的元素
        if (q.peek() == e) {
            // leader赋值为null。在消费者的位置再提一嘴
            leader = null;
            // 唤醒消费者,避免刚刚插入的数据的延迟时间出现问题。
            available.signal();
        }
        // 插入成功,
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

5.4、DelayQueue读取流程分析

消费者依然还是存在阻塞的情况,因为有两个情况

  • 消费者要拿到栈顶数据,但是延迟时间还没到,此时消费者需要等待一会。
  • 消费者要来拿数据,但是发现已经有消费者在等待栈顶数据了,这个后来的消费者也需要等待一会。

依然需要查看四个方法的实现

5.4.1 remove方法
// 依然是AbstractQueue提供的方法,有结果就返回,没结果扔异常
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
5.4.2 poll方法
// poll是浅尝一下,不会阻塞消费者,能拿就拿,拿不到就拉倒
public E poll() {
    // 消费者和生产者是一把锁,先拿锁,加锁。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
       	 // 拿到栈顶数据。
        E first = q.peek();
        // 如果元素为null,直接返回null
        // 如果getDelay方法返回的结果是大于0的,那说明当前元素还每到延迟时间,元素无法返回,返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            // 到这说明元素不为null,并且已经达到了延迟时间,直接调用优先级队列的poll方法
            return q.poll();
    } finally {
        // 释放锁。
        lock.unlock();
    }
}
5.4.3 poll(time,unit)方法

这个是允许阻塞的,并且指定一定的时间

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 先将时间转为纳秒
    long nanos = unit.toNanos(timeout);
    // 拿锁,加锁。
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 死循环。
        for (;;) {
            // 拿到堆顶数据
            E first = q.peek();
            // 如果元素为null
            if (first == null) {
                // 并且等待的时间小于等于0。不能等了,直接返回null
                if (nanos <= 0)
                    return null;
                // 说明当前线程还有可以阻塞的时间,阻塞指定时间即可。
                else
                    // 这里挂起线程后,说明队列没有元素,在生产者添加数据之后,会唤醒
                    nanos = available.awaitNanos(nanos);
            // 到这说明,有数据
            } else {
                // 有数据的话,先获取数据现在是否可以执行,延迟时间是否已经到了指定时间
                long delay = first.getDelay(NANOSECONDS);
                // 延迟时间是否已经到了,
                if (delay <= 0)
                    // 时间到了,直接执行优先级队列的poll方法,返回元素
                    return q.poll();
                // ==================延迟时间没到,消费者需要等一会===================
                // 这个是查看消费者可以等待的时间,
                if (nanos <= 0)
                    // 直接返回nulll
                    return null;
                // ==================延迟时间没到,消费者可以等一会===================
                // 把first赋值为null
                first = null; 
                // 如果等待的时间,小于元素剩余的延迟时间,消费者直接挂起。反正暂时拿不到,但是不能保证后续是否有生产者添加一个新的数据,我是可以拿到的。
                // 如果已经有一个消费者在等待堆顶数据了,我这边不做额外操作,直接挂起即可。
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                // 当前消费者的阻塞时间可以拿到数据,并且没有其他消费者在等待堆顶数据
                else {
                    // 拿到当前消费者的线程对象
                    Thread thisThread = Thread.currentThread();
                    // 将leader设置为当前线程
                    leader = thisThread;
                    try {
                        // 会让当前消费者,阻塞这个元素的延迟时间
                        long timeLeft = available.awaitNanos(delay);
                        // 重新计算当前消费者剩余的可阻塞时间,。
                        nanos -= delay - timeLeft;
                    } finally {
                        // 到了时间,将leader设置为null
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 没有消费者在等待元素,队列中的元素不为null
        if (leader == null && q.peek() != null)
            // 只要当前没有leader在等,并且队列有元素,就需要再次唤醒消费者。、
            // 避免队列有元素,但是没有消费者处理的问题
            available.signal();
        // 释放锁
        lock.unlock();
    }
}
5.4.4 take方法

这个是允许阻塞的,但是可以一直等,要么等到元素,要么等到被中断。

public E take() throws InterruptedException {
    // 正常加锁,并且允许中断
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 拿到元素
            E first = q.peek();
            if (first == null)
                // 没有元素挂起。
                available.await();
            else {
                // 有元素,获取延迟时间。
                long delay = first.getDelay(NANOSECONDS);
                // 判断延迟时间是不是已经到了
                if (delay <= 0)
                    // 基于优先级队列的poll方法返回
                    return q.poll();
                first = null; 
                // 如果有消费者在等,就正常await挂起
                if (leader != null)
                    available.await();
                // 如果没有消费者在等的堆顶数据,我来等
                else {
                    // 获取当前线程
                    Thread thisThread = Thread.currentThread();
                    // 设置为leader,代表等待堆顶的数据
                    leader = thisThread;
                    try {
                        // 等待指定(堆顶元素的延迟时间)时长,
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            // leader赋值null
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 避免消费者无线等,来一个唤醒消费者的方法,一般是其他消费者拿到元素走了之后,并且延迟队列还有元素,就执行if内部唤醒方法
        if (leader == null && q.peek() != null)
            available.signal();
        // 释放锁
        lock.unlock();
    }
}
文章来源:https://blog.csdn.net/m0_63694520/article/details/135625305
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。