RocketMQ 相较于其它消息队列产品的一个特性是支持延时消息,也就是说消息发送到 Broker 不会立马投递给消费者,要等待一个指定的延迟时间再投递,适用场景例如:下单后多长时间没付款系统自动关闭订单。
RocketMQ 4.x 版本的延时消息存在一定的局限性,实现原理是:Broker 内置了名称为SCHEDULE_TOPIC_XXXX
的Topic,包含 18 个对应延时级别的队列,每个延时级别的时间是固定的。Broker 收到消息后,如果发现是延时消息就会改写消息的 Topic 和 queueID,再通过专门的线程定时扫描这些队列,把延迟时间已到的消息重新投递到真正的 Topic 里面。
这种实现方案相对简单,但也存在局限性。消息的延迟时间不能随意设置,只能按照给定的延迟级别来设置;最长的延迟级别是两小时,如果用户需要更久的延时,就只能自行改造方案。
RocketMQ 5.0 意识到了这个问题,终于延时消息迎来升级,废弃了之前延迟级别的设计,消费者可以设置 24 小时内的任意时间,这个限制其实还可以增加,不过堆积太多的消息会带来其它问题。
延时消息的设计是存在一些难点的,下面逐个分析。
1、任意延迟时间如何扫描?
延迟消息肯定需要一个数据结构来存储,为了不影响消息发送的吞吐量,还必须保证写入的高性能。然后还需要有线程定时的扫描这个数据结构,把延迟时间已到的消息投递给消费者。
RocketMQ 4.x 为了保证高性能写,还是把延时消息正常写 CommitLog,只不过换个 Topic。不同延迟时长的消息写入不同队列,这样就能保证每个队列前面的消息肯定比后面的消息先投递,线程扫描的时候顺序扫,只要扫到第一个延迟时间还没到的消息,后面的消息就可以跳过了,避免全局扫描。
2、消息清理问题
RocketMQ 本身可以看作是一个缓冲区,是用来做削峰填谷的,消息不可能一直保留,所以要有定时清理机制。CommitLog 默认会保存三天,如果支持太久的延迟时间,万一 CommitLog 被清理了,延迟时间投递的时候就无法读取出原始消息内容了。
3、大量消息同时到期
大量消息同时到期也是延迟消息比较头疼的问题,毕竟是单线程扫描投递,如果大量消息同时到期,短时间内投递的消息太多,就会导致消息延迟,不过这个问题可以业务上加个随机时间解决。
RocketMQ 5.0 引入了时间轮算法 (TimingWheel) 来支持任意时间的延时消息。
先看一下算法的思想,如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。
基于时间轮算法,就可以实现任意时间的延时任务的调度执行,如果你觉得“秒”的精度不够,可以把刻度再拆分的更精细化一些,定时任务跑的频率更高一些即可。
RocketMQ 是怎么实现时间轮算法的呢?
RocketMQ 用 TimerWheel 类来封装时间轮,它实际对应磁盘上一个固定大小的文件,默认文件路径是${StoreHome}/timerwheel
,默认大小约 37 MB。
TimerWheel 由若干个刻度组成,一个刻度就代表一个时间单位,RocketMQ 用Slot
类来描述刻度,默认一个 Slot 代表一秒。TimerWheel 默认有 14 天内以秒为单位的所有 Slot,即 1209600 个 Slot。
每个 Slot 占用固定的 32 字节,格式如下:
字段 | 长度(字节) | 说明 |
---|---|---|
timeMs | 8 | 延迟时间 |
firstPos | 8 | 第一个消息的位置 |
lastPos | 8 | 最后一个消息的位置 |
num | 4 | 消息数量 |
magic | 4 | 魔数(废弃) |
每个 Slot 都有对应的延迟时间,相同延迟时间的消息才会被添加进来,多个延时消息会串联成链表,Slot 用两个指针分别指向了链表的首尾地址。现在的问题是,延时消息存到哪里呢?
为了保证消息写入的性能,延时消息会顺序写入到timerlog
文件里,它有点类似 CommitLog,区别是 timerlog 不会存储完整的消息,因为太浪费空间了。延时消息会被转换成固定的 52 字节的 TimerLog 写入。
写入的 TimerLog 格式如下:
字段 | 长度(字节) | 说明 |
---|---|---|
size | 4 | log 长度,固定值 52 |
prev pos | 8 | 前一条 log 的位置 |
magic | 4 | 魔数 |
curr write time | 8 | 写入时间 |
delayed time | 4 | 延迟时间 |
offsetPy | 8 | 实际消息在 CommitLog 偏移量 |
sizePy | 4 | 实际消息大小 |
hash code of real topic | 4 | 真实 Topic 哈希码 |
reserved value | 8 | 保留位 未使用 |
当有延时消息要被添加到 TimerWheel,首先要根据消息的延迟时间定位到 Slot,然后转换成 52 字节的 TimerLog,顺序写入 timerlog 文件,同时更新 Slot。
如图所示,现在要在 1号 Slot 上再添加一条延时消息1-4
,需要先把1-4
写入 timerlog,1-4
的 prevPos 指针会指向1-3
串联成链表,再把 Slot -> lastPos 指向1-4
。
延时消息存储起来了,接下就是线程定时扫描时间轮上的 Slot,判断消息如果到期就投递到原始 Topic 里面让消费者开始消费,如果消息没到期就重新投递进延时队列,进入下一期的时间轮。
不管是延时消息写入 timerlog、还是从 timerlog 取出消息重新投递,这些工作都是通过起单独的线程定时执行的,这里列举下相关的线程 Service:
结合流程图再回顾一下整体流程,首先是延时消息发到 Broker 被写入 CommitLog:
然后是 TimerMessageStore 启动线程把延时消息写入 timerlog、再定时扫描到期消息重新投递的过程。
客户端 SDK 发送延时消息前,会把延迟时间放在 Message -> system_properties 属性里面,Broker 收到消息准备 putMessage 前,会触发PutMessageHook
预留的钩子函数,其中一个叫handleScheduleMessage
的钩子就是专门处理延时消息的。
最终会调用HookUtils#handleScheduleMessage
方法,如果判断 Broker 启用了时间轮算法,且接收到的是延时消息,就会对消息进行转换。
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (!isRolledTimerMessage(msg)) {
if (checkIfTimerMessage(msg)) {
if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
}
// 是延时消息 且启用时间轮算法 消息转换
PutMessageResult tranformRes = transformTimerMessage(brokerController, msg);
if (null != tranformRes) {
return tranformRes;
}
}
}
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
}
return null;
}
转换的过程就是解析出延迟时间,然后把延迟时间和真实 Topic、QueueID 写入 properties,最后改写 Topic:
private static PutMessageResult transformTimerMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
//do transform
int delayLevel = msg.getDelayTimeLevel();
long deliverMs;
try {
// 从properties取出延迟时间
if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000;
} else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) {
deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS));
} else {
deliverMs = Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS));
}
} catch (Exception e) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
}
if (deliverMs > System.currentTimeMillis()) {
if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
}
// 处理精度
int timerPrecisionMs = brokerController.getMessageStoreConfig().getTimerPrecisionMs();
if (deliverMs % timerPrecisionMs == 0) {
deliverMs -= timerPrecisionMs;
} else {
deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;
}
if (brokerController.getTimerMessageStore().isReject(deliverMs)) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null);
}
// 改写Topic,把真实Topic写入properties
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(TimerMessageStore.TIMER_TOPIC);
msg.setQueueId(0);
} else if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
}
return null;
}
后续就是把消息写入 CommitLog,因为改写了 Topic,所以消费者现在是没办法消费延时消息的。
接着就是 TimerEnqueueGetService 线程消费rmq_sys_wheel_timer
队列,取出延时消息,构建 TimerRequest 放到 enqueuePutQueue。
public boolean enqueue(int queueId) {
......
ConsumeQueue cq = (ConsumeQueue) this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId);
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(offset);
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 取出原始消息
MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy);
// 延迟时间
long delayedTime = Long.parseLong(msgExt.getProperty(TIMER_OUT_MS));
// use CQ offset, not offset in Message
msgExt.setQueueOffset(offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, System.currentTimeMillis(), MAGIC_DEFAULT, msgExt);
while (true) {
// 延时消息封装成TimerRequest入队
if (enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) {
break;
}
if (!isRunningEnqueue()) {
return false;
}
}
}
......
}
再是 TimerEnqueuePutService 线程从 enqueuePutQueue 取出 先前构建好的TimerRequest,写入 timerlog。
public void run() {
......
while (!this.isStopped() || enqueuePutQueue.size() != 0) {
TimerRequest firstReq = enqueuePutQueue.poll(10, TimeUnit.MILLISECONDS);
// 延迟时间小于当前时间,入队dequeuePutQueue
if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
dequeuePutQueue.put(req);
} else {
// 写入timerlog
boolean doEnqueueRes = doEnqueue(req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
req.idempotentRelease(doEnqueueRes || storeConfig.isTimerSkipUnknownError());
}
}
......
}
首先需要定位到 Slot,再顺序写 timerlog,最后更新 Slot 信息。
public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) {
LOGGER.debug("Do enqueue [{}] [{}]", new Timestamp(delayedTime), messageExt);
long tmpWriteTimeMs = currWriteTimeMs;
// 超过一轮时间周期,到期后需要重新进入下一期时间轮等待
boolean needRoll = delayedTime - tmpWriteTimeMs >= timerRollWindowSlots * precisionMs;
int magic = MAGIC_DEFAULT;
if (needRoll) {
magic = magic | MAGIC_ROLL;
if (delayedTime - tmpWriteTimeMs - timerRollWindowSlots * precisionMs < timerRollWindowSlots / 3 * precisionMs) {
//give enough time to next roll
delayedTime = tmpWriteTimeMs + (timerRollWindowSlots / 2) * precisionMs;
} else {
delayedTime = tmpWriteTimeMs + timerRollWindowSlots * precisionMs;
}
}
boolean isDelete = messageExt.getProperty(TIMER_DELETE_UNIQKEY) != null;
if (isDelete) {
magic = magic | MAGIC_DELETE;
}
String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
// 定位Slot,顺序写timerLog
Slot slot = timerWheel.getSlot(delayedTime);
ByteBuffer tmpBuffer = timerLogBuffer;
tmpBuffer.clear();
tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
tmpBuffer.putLong(slot.lastPos); //prev pos
tmpBuffer.putInt(magic); //magic
tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
tmpBuffer.putLong(offsetPy); //offset
tmpBuffer.putInt(sizePy); //size
tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
tmpBuffer.putLong(0); //reserved value, just set to 0 now
long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
if (-1 != ret) {
// 更新timerWheel对应的Slot
timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
addMetric(messageExt, isDelete ? -1 : 1);
}
return -1 != ret;
}
不管是 timerlog 还是 timerWheel 文件,都是需要频繁写的,为了提高性能,RocketMQ 均使用 mmap 技术写,然后定时 flush 到磁盘。
然后是 TimerDequeueGetService 线程,定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue
public int dequeue() throws Exception {
// 定位到Slot
Slot slot = timerWheel.getSlot(currReadTimeMs);
if (-1 == slot.timeMs) {// Slot是空的
moveReadTime();
return 0;
}
long currOffsetPy = slot.lastPos;
while (currOffsetPy != -1) {
// 定位timerlog
timeSbr = timerLog.getWholeBuffer(currOffsetPy);
int position = (int) (currOffsetPy % timerLogFileSize);
timeSbr.getByteBuffer().position(position);
timeSbr.getByteBuffer().getInt(); //size
prevPos = timeSbr.getByteBuffer().getLong();
int magic = timeSbr.getByteBuffer().getInt();
long enqueueTime = timeSbr.getByteBuffer().getLong();
long delayedTime = timeSbr.getByteBuffer().getInt() + enqueueTime;
long offsetPy = timeSbr.getByteBuffer().getLong();
int sizePy = timeSbr.getByteBuffer().getInt();
// timerlog再转换成TimerRequest
TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, enqueueTime, magic);
timerRequest.setDeleteList(deleteUniqKeys);
for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {
for (TimerRequest tr : normalList) {
tr.setLatch(normalLatch);
}
// TimerRequest入队dequeueGetQueue
dequeueGetQueue.put(normalList);
}
}
......
}
再是 TimerDequeueGetMessageService 线程,从 dequeueGetQueue 取出 TimerRequest,从 CommitLog 查询完整消息,放入 dequeuePutQueue。
public void run() {
while (!this.isStopped()) {
List<TimerRequest> trs = dequeueGetQueue.poll(100 * precisionMs / 1000, TimeUnit.MILLISECONDS);
for (int i = 0; i < trs.size(); ) {
// CommitLog查询完整消息
MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
String uniqkey = MessageClientIDSetter.getUniqID(msgExt);
if (null != uniqkey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(uniqkey)) {
doRes = true;
tr.idempotentRelease();
perfs.getCounter("dequeue_delete").flow(1);
} else {
tr.setMsg(msgExt);
while (!isStopped() && !doRes) {
// 入队
doRes = dequeuePutQueue.offer(tr, 3, TimeUnit.SECONDS);
}
}
}
}
}
最后是 TimerDequeuePutMessageService 线程,从 dequeuePutQueue 取出 TimerRequest,判断消息是否到期,到期直接投递到真实的 Topic,没到期进入下一期时间轮。
public void run() {
while (!this.isStopped() || dequeuePutQueue.size() != 0) {
TimerRequest tr = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS);
// 消息转换 如果到期则复原Topic queueID
MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
while (!doRes && !isStopped()) {
if (!isRunningDequeue()) {
dequeueStatusChangeFlag = true;
tmpDequeueChangeFlag = true;
break;
}
// 重写写入CommitLog
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
Thread.sleep(500 * precisionMs / 1000);
}
}
.......
}
时间轮对应的类是 TimerWheel,它对应磁盘上的一个文件,由若干个 Slot 组成,因为要随机读写,所以 RocketMQ 使用 RandomAccessFile 来读写文件。
public class TimerWheel {
// 总的Slot数量
public final int slotsTotal;
// 时间精度
public final int precisionMs;
// 文件名
private String fileName;
private final RandomAccessFile randomAccessFile;
private final FileChannel fileChannel;
// mmap
private final MappedByteBuffer mappedByteBuffer;
private final ByteBuffer byteBuffer;
private final ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return byteBuffer.duplicate();
}
};
// 时间轮文件大小
private final int wheelLength;
}
Slot 类的属性:
public class Slot {
public static final short SIZE = 32;
public final long timeMs; // 延迟时间
public final long firstPos; // 第一个消息的位置
public final long lastPos; // 最后一个消息的位置
public final int num; // 消息数量
public final int magic; //no use now, just keep it
}
最后是 TimerLog,它底层对应一组文件,单个文件限制在 100MB 大小,如果写满了就创建新的文件继续写。因为是顺序写的,所以效率很高。
public class TimerLog {
private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
public final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
private final static int MIN_BLANK_LEN = 4 + 8 + 4;
public final static int UNIT_SIZE = 4 //size
+ 8 //prev pos
+ 4 //magic value
+ 8 //curr write time, for trace
+ 4 //delayed time, for check
+ 8 //offsetPy
+ 4 //sizePy
+ 4 //hash code of real topic
+ 8; //reserved value, just in case of
public final static int UNIT_PRE_SIZE_FOR_MSG = 28;
public final static int UNIT_PRE_SIZE_FOR_METRIC = 40;
private final MappedFileQueue mappedFileQueue;
private final int fileSize;
}
RocketMQ 5.0 解除了 4.x 版本延时消息延迟级别的时间限制,现在生产者可以设置任意延迟时间了,功能上更加强大,实现上也更加复杂。RocketMQ 引入了新的时间轮算法,简单理解就是把时间按照精度划分成 N 个Slot,消息会按照延迟时间加入到对应的 Slot,然后线程定时扫描时间轮,把 Slot 对应的到期消息重新投递即可。新的算法不仅实现更复杂,RocketMQ 还需要额外写 timerwheel 和 timerlog 文件,这两个文件也是要持久化定期刷盘的。