DelayQueue就是一个延迟队列,生产者写入一个消息,这个消息还有直接被消费的延迟时间。
需要让消息具有延迟的特性。
DelayQueue也是基于二叉堆结构实现的,甚至本事就是基于PriorityQueue实现的功能。二叉堆结构每次获取的是栈顶的数据,需要让DelayQueue中的数据,在比较时,跟根据延迟时间做比较,剩余时间最短的要放在栈顶。
查看DelayQueue类信息:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 发现DelayQueue中的元素,需要继承Delayed接口。
}
// ==========================================
// 接口继承了Comparable,这样就具备了比较的能力。
public interface Delayed extends Comparable<Delayed> {
// 抽象方法,就是咱们需要设置的延迟时间
long getDelay(TimeUnit unit);
// Comparable接口提供的:public int compareTo(T o);
}
基于上述特点,声明一个可以写入DelayQueue的元素类
public class Task implements Delayed {
/** 任务的名称 */
private String name;
/** 什么时间点执行 */
private Long time;
/**
*
* @param name
* @param delay 单位毫秒。
*/
public Task(String name, Long delay) {
// 任务名称
this.name = name;
this.time = System.currentTimeMillis() + delay;
}
/**
* 设置任务什么时候可以出延迟队列
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 单位是毫秒,视频里写错了,写成了纳秒,
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 两个任务在插入到延迟队列时的比较方式
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.time - ((Task)o).getTime());
}
}
在使用时,查看到DelayQueue底层用了PriorityQueue,在一定程度上,DelayQueue也是无界队列。
测试效果
public static void main(String[] args) throws InterruptedException {
// 声明元素
Task task1 = new Task("A",1000L);
Task task2 = new Task("B",5000L);
Task task3 = new Task("C",3000L);
Task task4 = new Task("D",2000L);
// 声明阻塞队列
DelayQueue<Task> queue = new DelayQueue<>();
// 将元素添加到延迟队列中
queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
// 获取元素
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
// A,D,C,B
}
在应用时,外卖,15分钟商家需要节点,如果不节点,这个订单自动取消。
可以每下一个订单,就放到延迟队列中,如果规定时间内,商家没有节点,直接通过消费者获取元素,然后取消订单。
只要是有需要延迟一定时间后,再执行的任务,就可以通过延迟队列去实现。
可以查看到DelayQueue就四个核心属性
// 因为DelayQueue依然属于阻塞队列,需要保证线程安全。看到只有一把锁,生产者和消费者使用的是一个lock
private final transient ReentrantLock lock = new ReentrantLock();
// 因为DelayQueue还是基于二叉堆结构实现的,没有必要重新搞一个二叉堆,直接使用的PriorityQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般会存储等待栈顶数据的消费者,在整体写入和消费的过程中,会设置的leader的一些判断。
private Thread leader = null;
// 生产者在插入数据时,不会阻塞的。当前的Condition就是给消费者用的
// 比如消费者在获取数据时,发现栈顶的数据还又没到延迟时间。
// 这个时候,咱们就需要将消费者线程挂起,阻塞一会,阻塞到元素到了延迟时间,或者是,生产者插入的元素到了栈顶,此时生产者会唤醒消费者。
private final Condition available = lock.newCondition();
Delay是无界的,数组可以动态的扩容,不需要关注生产者的阻塞问题,他就没有阻塞问题。
这里只需要查看offer方法即可。
public boolean offer(E e) {
// 直接获取lock,加锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 直接调用PriorityQueue的插入方法,这里会根据之前重写Delayed接口中的compareTo方法做排序,然后调整上移和下移操作。
q.offer(e);
// 调用优先级队列的peek方法,拿到堆顶的数据
// 拿到堆顶数据后,判断是否是刚刚插入的元素
if (q.peek() == e) {
// leader赋值为null。在消费者的位置再提一嘴
leader = null;
// 唤醒消费者,避免刚刚插入的数据的延迟时间出现问题。
available.signal();
}
// 插入成功,
return true;
} finally {
// 释放锁
lock.unlock();
}
}
消费者依然还是存在阻塞的情况,因为有两个情况
依然需要查看四个方法的实现
// 依然是AbstractQueue提供的方法,有结果就返回,没结果扔异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
// poll是浅尝一下,不会阻塞消费者,能拿就拿,拿不到就拉倒
public E poll() {
// 消费者和生产者是一把锁,先拿锁,加锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 拿到栈顶数据。
E first = q.peek();
// 如果元素为null,直接返回null
// 如果getDelay方法返回的结果是大于0的,那说明当前元素还每到延迟时间,元素无法返回,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 到这说明元素不为null,并且已经达到了延迟时间,直接调用优先级队列的poll方法
return q.poll();
} finally {
// 释放锁。
lock.unlock();
}
}
这个是允许阻塞的,并且指定一定的时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 先将时间转为纳秒
long nanos = unit.toNanos(timeout);
// 拿锁,加锁。
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 死循环。
for (;;) {
// 拿到堆顶数据
E first = q.peek();
// 如果元素为null
if (first == null) {
// 并且等待的时间小于等于0。不能等了,直接返回null
if (nanos <= 0)
return null;
// 说明当前线程还有可以阻塞的时间,阻塞指定时间即可。
else
// 这里挂起线程后,说明队列没有元素,在生产者添加数据之后,会唤醒
nanos = available.awaitNanos(nanos);
// 到这说明,有数据
} else {
// 有数据的话,先获取数据现在是否可以执行,延迟时间是否已经到了指定时间
long delay = first.getDelay(NANOSECONDS);
// 延迟时间是否已经到了,
if (delay <= 0)
// 时间到了,直接执行优先级队列的poll方法,返回元素
return q.poll();
// ==================延迟时间没到,消费者需要等一会===================
// 这个是查看消费者可以等待的时间,
if (nanos <= 0)
// 直接返回nulll
return null;
// ==================延迟时间没到,消费者可以等一会===================
// 把first赋值为null
first = null;
// 如果等待的时间,小于元素剩余的延迟时间,消费者直接挂起。反正暂时拿不到,但是不能保证后续是否有生产者添加一个新的数据,我是可以拿到的。
// 如果已经有一个消费者在等待堆顶数据了,我这边不做额外操作,直接挂起即可。
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
// 当前消费者的阻塞时间可以拿到数据,并且没有其他消费者在等待堆顶数据
else {
// 拿到当前消费者的线程对象
Thread thisThread = Thread.currentThread();
// 将leader设置为当前线程
leader = thisThread;
try {
// 会让当前消费者,阻塞这个元素的延迟时间
long timeLeft = available.awaitNanos(delay);
// 重新计算当前消费者剩余的可阻塞时间,。
nanos -= delay - timeLeft;
} finally {
// 到了时间,将leader设置为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 没有消费者在等待元素,队列中的元素不为null
if (leader == null && q.peek() != null)
// 只要当前没有leader在等,并且队列有元素,就需要再次唤醒消费者。、
// 避免队列有元素,但是没有消费者处理的问题
available.signal();
// 释放锁
lock.unlock();
}
}
这个是允许阻塞的,但是可以一直等,要么等到元素,要么等到被中断。
public E take() throws InterruptedException {
// 正常加锁,并且允许中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 拿到元素
E first = q.peek();
if (first == null)
// 没有元素挂起。
available.await();
else {
// 有元素,获取延迟时间。
long delay = first.getDelay(NANOSECONDS);
// 判断延迟时间是不是已经到了
if (delay <= 0)
// 基于优先级队列的poll方法返回
return q.poll();
first = null;
// 如果有消费者在等,就正常await挂起
if (leader != null)
available.await();
// 如果没有消费者在等的堆顶数据,我来等
else {
// 获取当前线程
Thread thisThread = Thread.currentThread();
// 设置为leader,代表等待堆顶的数据
leader = thisThread;
try {
// 等待指定(堆顶元素的延迟时间)时长,
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
// leader赋值null
leader = null;
}
}
}
}
} finally {
// 避免消费者无线等,来一个唤醒消费者的方法,一般是其他消费者拿到元素走了之后,并且延迟队列还有元素,就执行if内部唤醒方法
if (leader == null && q.peek() != null)
available.signal();
// 释放锁
lock.unlock();
}
}