简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.
源码如下:
/*
* Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.
*/
package com.example.xxx.utils;
import com.vevor.bmp.crm.common.constants.MQConstants;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @version :1.8.0
* @description :基于Rocket MQ的任意延迟时长工具
* @program :user-growth
* @date :Created in 2023/5/22 3:35 下午
* @since :1.8.0
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,
topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,
// 消息消费顺序
consumeMode = ConsumeMode.CONCURRENTLY,
// 最大消息重复消费次数
maxReconsumeTimes = 3)
public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {
/**
* Rocket MQ客户端
*/
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* MQ默认延迟等级
*/
private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,
30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,
480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};
@SneakyThrows
@Override
public void onMessage(DelayTable<Object> message) {
Date endTime = message.getEndTime();
int delayLevel = getDelayLevel(endTime);
// 继续延迟
if (delayLevel != 0) {
int currentDelayCount = message.getCurrentDelayCount();
currentDelayCount++;
message.setCurrentDelayCount(currentDelayCount);
message.setCurrentDelayLevel(delayLevel);
message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);
this.sendDelayMessage(message);
return;
}
// 执行业务
log.info("delay message end! start to process business...");
Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();
if (messageHandler != null) {
DelayMessageHandler delayMessageHandler = messageHandler.newInstance();
delayMessageHandler.handle();
}
}
/**
* 延迟消息体
*
* @param <E> 消息类型
*/
@Data
public static class DelayTable<E> implements Serializable {
private static final long serialVersionUID = 2405172041950251807L;
/**
* 延迟消息体
*/
private E content;
/**
* 消息延迟结束时间
*/
private Date endTime;
/**
* 总延迟毫秒数
*/
private long totalDelayTime;
/**
* 总延迟时间单位
*/
private TimeUnit totalDelayTimeUnit;
/**
* 当前延迟次数
*/
private int currentDelayCount;
/**
* 当前延迟等级
*/
private int currentDelayLevel;
/**
* 当前延迟毫秒数
*/
private long currentDelayMillis;
/**
* 延迟处理逻辑
*/
private Class<? extends DelayMessageHandler> messageHandler;
}
/**
* 发送延迟消息
*
* @param message 消息体
* @param delay 延迟时长
* @param timeUnit 延迟时间单位
* @param handler 延迟时间到了之后,需要处理的逻辑
* @param <E> 延迟消息类型
*/
public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {
// 把延迟时间转换成时间戳(毫秒)
long totalDelayMills = timeUnit.toMillis(delay);
// 根据延迟时间计算结束时间
Calendar instance = Calendar.getInstance();
instance.add(Calendar.MILLISECOND, (int)totalDelayMills);
Date endTime = instance.getTime();
// 根据延迟时间匹配延迟等级(delay level)
int delayLevel = getDelayLevel(endTime);
long delayMillis = TIME_DELAY_LEVEL[delayLevel];
// 发送消息
DelayTable<E> delayTable = new DelayTable<>();
// 全局数据
delayTable.setContent(message);
delayTable.setMessageHandler(handler);
delayTable.setEndTime(endTime);
delayTable.setTotalDelayTime(delay);
delayTable.setTotalDelayTimeUnit(timeUnit);
// 当前延迟等级数据
delayTable.setCurrentDelayCount(1);
delayTable.setCurrentDelayLevel(delayLevel);
delayTable.setCurrentDelayMillis(delayMillis);
this.sendDelayMessage(delayTable);
}
/**
* 计算延迟等级
*
* @param targetTime 延迟截止时间
* @return Rocket MQ延迟消息等级
*/
private static int getDelayLevel(Date targetTime) {
long currentTime = System.currentTimeMillis();
long delayMillis = targetTime.getTime() - currentTime;
if (delayMillis <= 0) {
// 不延迟,即延迟等级为 0
return 0;
}
// 判断处于哪个延迟等级
// 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h
for (int i = 1; i <= 18; i++) {
long delayLevelTime = TIME_DELAY_LEVEL[i];
if (delayMillis < delayLevelTime) {
return i - 1;
} else if (delayMillis == delayLevelTime) {
return i;
}
}
// 最大延迟等级为 18
return 18;
}
/**
* 发送延迟消息
*
* @param delayTable 延迟对象,可以循环使用
*/
@SneakyThrows
private <E> void sendDelayMessage(DelayTable<E> delayTable) {
// 消息序列化
Message<DelayTable<E>> message = MessageBuilder
.withPayload(delayTable)
.build();
// 设置\发送延迟消息
int delayLevel = delayTable.getCurrentDelayLevel();
rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message
, 3000, delayLevel);
log.debug("delay count: {}, delay level: {}, time: {} milliseconds",
delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);
}
/**
* 延迟回调接口
*
* 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法
*/
public interface DelayMessageHandler extends Serializable {
long serialVersionUID = 2405172041950251807L;
/**
* 回调函数
*/
void handle();
}
}
/*
* Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.
*/
package com.vevor.bmp.crm.io.controller;
import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils;
import com.vevor.common.pojo.vo.ResponseResult;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @version :1.8.0
* @description :延迟队列测试
* @program :user-growth
* @date :Created in 2023/5/22 4:54 下午
* @since :1.8.0
*/
@Slf4j
@RestController
public class DelayQueueController {
@Resource
private RocketMQDelayQueueUtils rocketMQDelayQueueUtils;
@GetMapping("/mq/delay")
@SneakyThrows
public ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {
// 获取延时队列
rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);
return ResponseResult.success();
}
/**
* @version :
* @description :
* @program :user-growth
* @date :Created in 2023/5/23 2:11 下午
* @since :
*/
@Data
public static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {
/**
* 回调函数
*/
@Override
public void handle() {
log.info("i am business logical! {}", System.currentTimeMillis());
}
}
}