DelayQueue是基于阻塞队列实现的,是JDK自带延迟队列。这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。但缺点也很明显,就是需要占用JVM内存,在数据量非常大的情况下可能会有问题。如果数据量非常大,DelayQueue不能满足业务需求,也可以替换为其它延迟队列方式,例如Redisson、MQ等。
首先来看一下DelayQueue的源码:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>?
implements BlockingQueue<E> {?
?
private final transient ReentrantLock lock = new ReentrantLock();?
private final PriorityQueue<E> q = new PriorityQueue<E>();?
?
// ... 略?
}
可以看到DelayQueue实现了BlockingQueue接口,是一个阻塞队列。队列就是容器,用来存储东西的。DelayQueue叫做延迟队列,其中存储的就是延迟执行的任务。
我们可以看到DelayQueue的泛型定义:
DelayQueue<E extends Delayed>
这说明存入 DelayQueue 内部的元素必须是 Delayed 类型,这其实就是一个延迟任务的规范接口。来看一下:
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
从源码中可以看出,Delayed类型必须具备两个方法:
可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。
将来每一次提交数据,就可以将数据保存在这样的一个 Delayed 类型的延迟任务里并设定延迟时间。然后交给 DelayQueue 队列。 DelayQueue 会调用compareTo方法,根据剩余延迟时间对任务排序。剩余延迟时间越短的越靠近队首,这样就会被优先执行。
import lombok.Data;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
public class DelayedTask implements Delayed {
// 消息体
private Object data;
// 过期时间
private Long deadTime;
/**
* 到期时间,毫秒
*/
public DelayedTask(Object data, Long expireTime) {
this.data = data;
this.deadTime = System.currentTimeMillis() + expireTime * 1000;
}
/**
* 获取元素的剩余的有效期
*/
@Override
public long getDelay(TimeUnit unit) {
long remain = this.deadTime - System.currentTimeMillis();
if (remain < 0) {
remain = 0;
}
return unit.convert(remain, TimeUnit.MILLISECONDS);
}
/**
* 比较元素的剩余,看谁先到期
*/
@Override
public int compareTo(Delayed o) {
long l = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
if (l > 0) {
return 1;
} else if (l < 0) {
return -1;
} else {
return 0;
}
}
}
import com.example.delayedqueuedemo.task.DelayedTask;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.DelayQueue;
@Slf4j
public class DelayedQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 1.初始化延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
DelayedTask task1 = new DelayedTask("data 5 seconds", 5L);
DelayedTask task2 = new DelayedTask("data 1 seconds", 1L);
DelayedTask task3 = new DelayedTask("data 3 seconds", 3L);
// 2.向队列中添加延迟执行的任务
delayQueue.put(task1);
delayQueue.put(task2);
delayQueue.put(task3);
// 3.尝试执行任务
log.info("消息发送完成");
// 如果队列中没有到期的元素,take会阻塞等待
while (true) {
DelayedTask task = delayQueue.take();
log.info("获取延迟任务:task:{}", task.getData());
}
}
}
在实际项目中,需要将发送端和接收端分离,避免死循环的队列接收阻塞主程序运行,故将列队接收代码抽离到 Spring 在生命周期回调期间执行,具体代码如下:
import com.example.delayedqueuedemo.task.DelayedTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class DelayedService {
private ThreadPoolExecutor takeTaskPool = new ThreadPoolExecutor(1, 1, 120, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
private ThreadPoolExecutor executeTaskPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 120, TimeUnit.SECONDS, new ArrayBlockingQueue<>(Runtime.getRuntime().availableProcessors()), new ThreadPoolExecutor.CallerRunsPolicy());
private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
/**
* 生命周期回调
*/
@PostConstruct
public void postConstruct() {
// 开启线程池,避免死循环阻塞主程序
takeTaskPool.execute(new Runnable() {
@Override
public void run() {
// 如果队列中没有到期的元素,take会阻塞等待
while (true) {
try {
DelayedTask task = delayQueue.take();
// 为了防止业务逻辑执行太长,阻塞消息的读取
executeTaskPool.execute(new Runnable() {
@Override
public void run() {
// 执行业务代码
log.info("获取延迟任务:task:{}", task.getData());
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
public void delayed(String data, Long expireTime) {
// 1.初始化延迟队列
DelayedTask task = new DelayedTask(data, expireTime);
// 2.向队列中添加延迟执行的任务
delayQueue.put(task);
// 3.尝试执行任务
log.info("消息发送完成");
}
}
若需要开启事务支持,直接用 @Transactional 注解会出现事务失效的情况(@Transactional 失效场景可参考该文章: 一口气怼完12种@Transactional的失效场景 - 知乎?),故有两种方法解决:
以下代码以编程式事务处理方案演示如何开启事务:
import com.example.delayedqueuedemo.task.DelayedTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class DelayedService {
private ThreadPoolExecutor takeTaskPool = new ThreadPoolExecutor(1, 1, 120, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
private ThreadPoolExecutor executeTaskPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 120, TimeUnit.SECONDS, new ArrayBlockingQueue<>(Runtime.getRuntime().availableProcessors()), new ThreadPoolExecutor.CallerRunsPolicy());
private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
@Autowired
private PlatformTransactionManager transactionManager;
/**
* 生命周期回调
*/
@PostConstruct
public void postConstruct() {
// 开启线程池,避免死循环阻塞主程序
takeTaskPool.execute(new Runnable() {
@Override
public void run() {
// 如果队列中没有到期的元素,take会阻塞等待
while (true) {
try {
DelayedTask task = delayQueue.take();
// 开启线程池,避免业务执行阻塞队列获取消息,核心线程数根据电脑cpu核数而定
executeTaskPool.execute(new Runnable() {
@Override
public void run() {
// 开启事务
TransactionDefinition definition = new DefaultTransactionDefinition();
TransactionStatus transaction = transactionManager.getTransaction(definition);
try {
// 业务逻辑
log.info("获取延迟任务:task:{}", task.getData());
// 提交事务
transactionManager.commit(transaction);
} catch (TransactionException e) {
// 回滚事务
transactionManager.rollback(transaction);
}
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
@Transactional
public void delayed(String data, Long expireTime) {
// 1.初始化延迟队列
DelayedTask task = new DelayedTask(data, expireTime);
// 2.向队列中添加延迟执行的任务
delayQueue.put(task);
// 3.尝试执行任务
log.info("消息发送完成");
}
}