目录
? ?延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理或者是在某个时间进行处理。
以下是我做项目时遇到的例子:? 需求就是用户设定时间,到时间之后,系统自动执行某个任务
采用轮询的策略监听redis的key的值,将用户输入的时间在后端转换为一个时间戳,利用redis Zset的数据结构来存储,主要用来判断的就是时间戳,Zset是一个有序的集合,所有时间戳在前面的就是先要执行的事件,当然用时间戳来比较的话,就是从0到现在时间的时间戳来比较大小,如何redis存入的时间戳大于0且 小于当前时间戳就代表执行任务,否则就代表待执行
首先,封装了一个实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DelayMessageVo implements Serializable {
/**
* 切记实例化
*/
private static final long serialVersionUID = -7671756385477179547L;
/**
* 消息 id
*/
private Integer id;
/**
* 消息内容
*/
private String content;
/**
* 消息到期时间
*/
private long expireTime;
}
?对Redis进行操作
@Component
public class DelayQueueService {
/**
* key后面拼接当前机器的内网ip : 用于集群区分,解决集群出现的并发问题
*/
private static final String KEY = "delay_queue:" + getHostAddress();
@Autowired
private RedisTemplate redisTemplate;
/**
* 添加消息到延时队列中
*/
public void put(DelayMessageVo message ) {
redisTemplate.opsForZSet().add(KEY, message, message.getExpireTime());
}
/**
* 从延时队列中删除消息
*/
public Long remove(DelayMessageVo message) {
Long remove = redisTemplate.opsForZSet().remove(KEY, message);
return remove;
}
/**
* 获取延时队列中已到期的消息
*/
public List<DelayMessageVo> getExpiredMessages() {
// 1 : 获取到开始时间
long minScore = 0;
// 2 : 获取当前时间
long maxScore = System.currentTimeMillis();
// 3 : 获取到指定范围区间的数据列表
Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(KEY, minScore, maxScore);
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
// 4 : 把对象进行封装,返回
List<DelayMessageVo> result = new ArrayList<>();
for (Object message : messages) {
// 将 DelayMessageVo 对象转换为 JSON 字符串
String jsonMessage = JSON.toJSONString(message);
DelayMessageVo delayMessage = JSONObject.parseObject(jsonMessage, DelayMessageVo.class);
result.add(delayMessage);
}
return result;
}
/**
* 获取地址(服务器的内网地址)(内网ip)
*
* @return
*/
public static String getHostAddress() {
InetAddress localHost = null;
try {
localHost = InetAddress.getLocalHost();
} catch (
UnknownHostException e) {
e.printStackTrace();
}
return localHost.getHostAddress();
}
}
?轮询策略
@Component
public class DelayMessageHandler {
public static SimpleDateFormat dateTimeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private DelayQueueService delayQueue;
@Autowired
private ExamineMapper examineMapper;
/**
* 处理已到期的消息(轮询)
*/
@Scheduled(fixedDelay = 60000)
public void handleExpiredMessages() {
String currentTime = getCurrentTime();
// 1 : 扫描任务,并将需要执行的任务加入到任务队列中
List<DelayMessageVo> messages = delayQueue.getExpiredMessages();
System.out.println(currentTime + " 待处理消息数量:" + messages.size());
// 2 : 开始处理消息
if (!messages.isEmpty()) {
for (DelayMessageVo message : messages) {
// 2.1 : 处理消息:先删除消息,获取当前消息是否已经被其他人消费
Long remove = delayQueue.remove(message);
if (remove > 0) {
// 2.2 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程
new Thread(() -> {
System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");
Integer id = message.getId();
String content = message.getContent();
if (content.equals("任务开始时间")){
examineMapper.updateBeginExamineStatus(id);
}else if (content.equals("任务结束时间")){
examineMapper.updateFinishExamineStatus(id);
}else if (content.equals("公告结束时间")){
examineMapper.updatePublicityExamineStatus(id);
}
try {
// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");
}).start();
}
}
}
}
/**
* 获取到的当前时分秒
*
* @return
*/
public static String getCurrentTime() {
String format = dateTimeFormater.format(new Date());
return format;
}
}