RocketMQ5.0延时消息时间轮算法

发布时间:2024年01月03日

前言

RocketMQ 相较于其它消息队列产品的一个特性是支持延时消息,也就是说消息发送到 Broker 不会立马投递给消费者,要等待一个指定的延迟时间再投递,适用场景例如:下单后多长时间没付款系统自动关闭订单。
RocketMQ 4.x 版本的延时消息存在一定的局限性,实现原理是:Broker 内置了名称为SCHEDULE_TOPIC_XXXX的Topic,包含 18 个对应延时级别的队列,每个延时级别的时间是固定的。Broker 收到消息后,如果发现是延时消息就会改写消息的 Topic 和 queueID,再通过专门的线程定时扫描这些队列,把延迟时间已到的消息重新投递到真正的 Topic 里面。
这种实现方案相对简单,但也存在局限性。消息的延迟时间不能随意设置,只能按照给定的延迟级别来设置;最长的延迟级别是两小时,如果用户需要更久的延时,就只能自行改造方案。
RocketMQ 5.0 意识到了这个问题,终于延时消息迎来升级,废弃了之前延迟级别的设计,消费者可以设置 24 小时内的任意时间,这个限制其实还可以增加,不过堆积太多的消息会带来其它问题。

设计难点

延时消息的设计是存在一些难点的,下面逐个分析。

1、任意延迟时间如何扫描?
延迟消息肯定需要一个数据结构来存储,为了不影响消息发送的吞吐量,还必须保证写入的高性能。然后还需要有线程定时的扫描这个数据结构,把延迟时间已到的消息投递给消费者。
RocketMQ 4.x 为了保证高性能写,还是把延时消息正常写 CommitLog,只不过换个 Topic。不同延迟时长的消息写入不同队列,这样就能保证每个队列前面的消息肯定比后面的消息先投递,线程扫描的时候顺序扫,只要扫到第一个延迟时间还没到的消息,后面的消息就可以跳过了,避免全局扫描。

2、消息清理问题
RocketMQ 本身可以看作是一个缓冲区,是用来做削峰填谷的,消息不可能一直保留,所以要有定时清理机制。CommitLog 默认会保存三天,如果支持太久的延迟时间,万一 CommitLog 被清理了,延迟时间投递的时候就无法读取出原始消息内容了。

3、大量消息同时到期
大量消息同时到期也是延迟消息比较头疼的问题,毕竟是单线程扫描投递,如果大量消息同时到期,短时间内投递的消息太多,就会导致消息延迟,不过这个问题可以业务上加个随机时间解决。

时间轮算法

RocketMQ 5.0 引入了时间轮算法 (TimingWheel) 来支持任意时间的延时消息。
image.png
先看一下算法的思想,如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。
基于时间轮算法,就可以实现任意时间的延时任务的调度执行,如果你觉得“秒”的精度不够,可以把刻度再拆分的更精细化一些,定时任务跑的频率更高一些即可。

设计实现

RocketMQ 是怎么实现时间轮算法的呢?
RocketMQ 用 TimerWheel 类来封装时间轮,它实际对应磁盘上一个固定大小的文件,默认文件路径是${StoreHome}/timerwheel,默认大小约 37 MB。
TimerWheel 由若干个刻度组成,一个刻度就代表一个时间单位,RocketMQ 用Slot类来描述刻度,默认一个 Slot 代表一秒。TimerWheel 默认有 14 天内以秒为单位的所有 Slot,即 1209600 个 Slot。
image.png
每个 Slot 占用固定的 32 字节,格式如下:

字段长度(字节)说明
timeMs8延迟时间
firstPos8第一个消息的位置
lastPos8最后一个消息的位置
num4消息数量
magic4魔数(废弃)

每个 Slot 都有对应的延迟时间,相同延迟时间的消息才会被添加进来,多个延时消息会串联成链表,Slot 用两个指针分别指向了链表的首尾地址。现在的问题是,延时消息存到哪里呢?

为了保证消息写入的性能,延时消息会顺序写入到timerlog文件里,它有点类似 CommitLog,区别是 timerlog 不会存储完整的消息,因为太浪费空间了。延时消息会被转换成固定的 52 字节的 TimerLog 写入。
写入的 TimerLog 格式如下:

字段长度(字节)说明
size4log 长度,固定值 52
prev pos8前一条 log 的位置
magic4魔数
curr write time8写入时间
delayed time4延迟时间
offsetPy8实际消息在 CommitLog 偏移量
sizePy4实际消息大小
hash code of real topic4真实 Topic 哈希码
reserved value8保留位 未使用

当有延时消息要被添加到 TimerWheel,首先要根据消息的延迟时间定位到 Slot,然后转换成 52 字节的 TimerLog,顺序写入 timerlog 文件,同时更新 Slot。
image.png
如图所示,现在要在 1号 Slot 上再添加一条延时消息1-4,需要先把1-4写入 timerlog,1-4的 prevPos 指针会指向1-3串联成链表,再把 Slot -> lastPos 指向1-4
image.png

延时消息存储起来了,接下就是线程定时扫描时间轮上的 Slot,判断消息如果到期就投递到原始 Topic 里面让消费者开始消费,如果消息没到期就重新投递进延时队列,进入下一期的时间轮。

不管是延时消息写入 timerlog、还是从 timerlog 取出消息重新投递,这些工作都是通过起单独的线程定时执行的,这里列举下相关的线程 Service:

  • TimerEnqueueGetService:从rmq_sys_wheel_timer队列取出消息,构建TimerRequest放入enqueuePutQueue
  • TimerEnqueuePutService:从enqueuePutQueue取出TimerRequest,写入timerlog
  • TimerDequeueGetService:定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue
  • TimerDequeueGutMessageService:从dequeueGetQueue取出TimerRequest,从CommitLog查询完整消息,放入dequeuePutQueue
  • TimerDequeuePutMessageService:从dequeuePutQueue取出TimerRequest,判断消息是否到期,到期直接投递到目标Topic,没到期进入下一期时间轮

结合流程图再回顾一下整体流程,首先是延时消息发到 Broker 被写入 CommitLog:
image.png
然后是 TimerMessageStore 启动线程把延时消息写入 timerlog、再定时扫描到期消息重新投递的过程。
image.png

源码

客户端 SDK 发送延时消息前,会把延迟时间放在 Message -> system_properties 属性里面,Broker 收到消息准备 putMessage 前,会触发PutMessageHook预留的钩子函数,其中一个叫handleScheduleMessage的钩子就是专门处理延时消息的。
最终会调用HookUtils#handleScheduleMessage方法,如果判断 Broker 启用了时间轮算法,且接收到的是延时消息,就会对消息进行转换。

public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
                                                     final MessageExtBrokerInner msg) {
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        if (!isRolledTimerMessage(msg)) {
            if (checkIfTimerMessage(msg)) {
                if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
                    return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
                }
                // 是延时消息 且启用时间轮算法 消息转换
                PutMessageResult tranformRes = transformTimerMessage(brokerController, msg);
                if (null != tranformRes) {
                    return tranformRes;
                }
            }
        }
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            transformDelayLevelMessage(brokerController, msg);
        }
    }
    return null;
}

转换的过程就是解析出延迟时间,然后把延迟时间和真实 Topic、QueueID 写入 properties,最后改写 Topic:

private static PutMessageResult transformTimerMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
    //do transform
    int delayLevel = msg.getDelayTimeLevel();
    long deliverMs;
    try {
        // 从properties取出延迟时间
        if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
            deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000;
        } else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) {
            deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS));
        } else {
            deliverMs = Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS));
        }
    } catch (Exception e) {
        return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
    }
    if (deliverMs > System.currentTimeMillis()) {
        if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000) {
            return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
        }
        // 处理精度
        int timerPrecisionMs = brokerController.getMessageStoreConfig().getTimerPrecisionMs();
        if (deliverMs % timerPrecisionMs == 0) {
            deliverMs -= timerPrecisionMs;
        } else {
            deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;
        }

        if (brokerController.getTimerMessageStore().isReject(deliverMs)) {
            return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null);
        }
        // 改写Topic,把真实Topic写入properties
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        msg.setTopic(TimerMessageStore.TIMER_TOPIC);
        msg.setQueueId(0);
    } else if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) {
        return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
    }
    return null;
}

后续就是把消息写入 CommitLog,因为改写了 Topic,所以消费者现在是没办法消费延时消息的。
接着就是 TimerEnqueueGetService 线程消费rmq_sys_wheel_timer队列,取出延时消息,构建 TimerRequest 放到 enqueuePutQueue。

public boolean enqueue(int queueId) {
	......
    ConsumeQueue cq = (ConsumeQueue) this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId);
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(offset);
    for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
        // 取出原始消息
    	MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy);
        // 延迟时间
        long delayedTime = Long.parseLong(msgExt.getProperty(TIMER_OUT_MS));
        // use CQ offset, not offset in Message
        msgExt.setQueueOffset(offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
        TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, System.currentTimeMillis(), MAGIC_DEFAULT, msgExt);
        while (true) {
            // 延时消息封装成TimerRequest入队
            if (enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) {
                break;
            }
            if (!isRunningEnqueue()) {
                return false;
            }
        }
    }
	......
}

再是 TimerEnqueuePutService 线程从 enqueuePutQueue 取出 先前构建好的TimerRequest,写入 timerlog。

public void run() {
    ......
	while (!this.isStopped() || enqueuePutQueue.size() != 0) {
        TimerRequest firstReq = enqueuePutQueue.poll(10, TimeUnit.MILLISECONDS);
        // 延迟时间小于当前时间,入队dequeuePutQueue
        if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
            dequeuePutQueue.put(req);
        } else {
            // 写入timerlog
            boolean doEnqueueRes = doEnqueue(req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
            req.idempotentRelease(doEnqueueRes || storeConfig.isTimerSkipUnknownError());
        }
    }
	......
}

首先需要定位到 Slot,再顺序写 timerlog,最后更新 Slot 信息。

public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) {
    LOGGER.debug("Do enqueue [{}] [{}]", new Timestamp(delayedTime), messageExt);
    long tmpWriteTimeMs = currWriteTimeMs;
    // 超过一轮时间周期,到期后需要重新进入下一期时间轮等待
    boolean needRoll = delayedTime - tmpWriteTimeMs >= timerRollWindowSlots * precisionMs;
    int magic = MAGIC_DEFAULT;
    if (needRoll) {
        magic = magic | MAGIC_ROLL;
        if (delayedTime - tmpWriteTimeMs - timerRollWindowSlots * precisionMs < timerRollWindowSlots / 3 * precisionMs) {
            //give enough time to next roll
            delayedTime = tmpWriteTimeMs + (timerRollWindowSlots / 2) * precisionMs;
        } else {
            delayedTime = tmpWriteTimeMs + timerRollWindowSlots * precisionMs;
        }
    }
    boolean isDelete = messageExt.getProperty(TIMER_DELETE_UNIQKEY) != null;
    if (isDelete) {
        magic = magic | MAGIC_DELETE;
    }
    String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
    // 定位Slot,顺序写timerLog
    Slot slot = timerWheel.getSlot(delayedTime);
    ByteBuffer tmpBuffer = timerLogBuffer;
    tmpBuffer.clear();
    tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
    tmpBuffer.putLong(slot.lastPos); //prev pos
    tmpBuffer.putInt(magic); //magic
    tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
    tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
    tmpBuffer.putLong(offsetPy); //offset
    tmpBuffer.putInt(sizePy); //size
    tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
    tmpBuffer.putLong(0); //reserved value, just set to 0 now
    long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
    if (-1 != ret) {
        // 更新timerWheel对应的Slot
        timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
            isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
        addMetric(messageExt, isDelete ? -1 : 1);
    }
    return -1 != ret;
}

不管是 timerlog 还是 timerWheel 文件,都是需要频繁写的,为了提高性能,RocketMQ 均使用 mmap 技术写,然后定时 flush 到磁盘。

然后是 TimerDequeueGetService 线程,定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue

public int dequeue() throws Exception {
    // 定位到Slot
    Slot slot = timerWheel.getSlot(currReadTimeMs);
    if (-1 == slot.timeMs) {// Slot是空的
        moveReadTime();
        return 0;
    }
    long currOffsetPy = slot.lastPos;
    while (currOffsetPy != -1) {
        // 定位timerlog
        timeSbr = timerLog.getWholeBuffer(currOffsetPy);
        int position = (int) (currOffsetPy % timerLogFileSize);
        timeSbr.getByteBuffer().position(position);
        timeSbr.getByteBuffer().getInt(); //size
        prevPos = timeSbr.getByteBuffer().getLong();
        int magic = timeSbr.getByteBuffer().getInt();
        long enqueueTime = timeSbr.getByteBuffer().getLong();
        long delayedTime = timeSbr.getByteBuffer().getInt() + enqueueTime;
        long offsetPy = timeSbr.getByteBuffer().getLong();
        int sizePy = timeSbr.getByteBuffer().getInt();
        // timerlog再转换成TimerRequest
        TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, enqueueTime, magic);
        timerRequest.setDeleteList(deleteUniqKeys);
        for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {
            for (TimerRequest tr : normalList) {
                tr.setLatch(normalLatch);
            }
            // TimerRequest入队dequeueGetQueue
            dequeueGetQueue.put(normalList);
        }
    }
	......
}

再是 TimerDequeueGetMessageService 线程,从 dequeueGetQueue 取出 TimerRequest,从 CommitLog 查询完整消息,放入 dequeuePutQueue。

public void run() {
    while (!this.isStopped()) {
        List<TimerRequest> trs = dequeueGetQueue.poll(100 * precisionMs / 1000, TimeUnit.MILLISECONDS);
        for (int i = 0; i < trs.size(); ) {
            // CommitLog查询完整消息
            MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
            String uniqkey = MessageClientIDSetter.getUniqID(msgExt);
            if (null != uniqkey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(uniqkey)) {
                doRes = true;
                tr.idempotentRelease();
                perfs.getCounter("dequeue_delete").flow(1);
            } else {
                tr.setMsg(msgExt);
                while (!isStopped() && !doRes) {
                    // 入队
                    doRes = dequeuePutQueue.offer(tr, 3, TimeUnit.SECONDS);
                }
            }
        }
    }
}

最后是 TimerDequeuePutMessageService 线程,从 dequeuePutQueue 取出 TimerRequest,判断消息是否到期,到期直接投递到真实的 Topic,没到期进入下一期时间轮。

public void run() {
    while (!this.isStopped() || dequeuePutQueue.size() != 0) {
        TimerRequest tr = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS);
        // 消息转换 如果到期则复原Topic queueID
        MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
        doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
        while (!doRes && !isStopped()) {
            if (!isRunningDequeue()) {
                dequeueStatusChangeFlag = true;
                tmpDequeueChangeFlag = true;
                break;
            }
            // 重写写入CommitLog
            doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
            Thread.sleep(500 * precisionMs / 1000);
        }
    }
    .......
}

时间轮对应的类是 TimerWheel,它对应磁盘上的一个文件,由若干个 Slot 组成,因为要随机读写,所以 RocketMQ 使用 RandomAccessFile 来读写文件。

public class TimerWheel {
    // 总的Slot数量
    public final int slotsTotal;
    // 时间精度
    public final int precisionMs;
    // 文件名
    private String fileName;
    private final RandomAccessFile randomAccessFile;
    private final FileChannel fileChannel;
    // mmap
    private final MappedByteBuffer mappedByteBuffer;
    private final ByteBuffer byteBuffer;
    private final ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<ByteBuffer>() {
        @Override
        protected ByteBuffer initialValue() {
            return byteBuffer.duplicate();
        }
    };
    // 时间轮文件大小
    private final int wheelLength;
}

Slot 类的属性:

public class Slot {
    public static final short SIZE = 32;
    public final long timeMs; // 延迟时间
    public final long firstPos; // 第一个消息的位置
    public final long lastPos; // 最后一个消息的位置
    public final int num; // 消息数量
    public final int magic; //no use now, just keep it
}

最后是 TimerLog,它底层对应一组文件,单个文件限制在 100MB 大小,如果写满了就创建新的文件继续写。因为是顺序写的,所以效率很高。

public class TimerLog {
    private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    public final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
    private final static int MIN_BLANK_LEN = 4 + 8 + 4;
    public final static int UNIT_SIZE = 4  //size
            + 8 //prev pos
            + 4 //magic value
            + 8 //curr write time, for trace
            + 4 //delayed time, for check
            + 8 //offsetPy
            + 4 //sizePy
            + 4 //hash code of real topic
            + 8; //reserved value, just in case of
    public final static int UNIT_PRE_SIZE_FOR_MSG = 28;
    public final static int UNIT_PRE_SIZE_FOR_METRIC = 40;
    private final MappedFileQueue mappedFileQueue;
    private final int fileSize;
}

尾巴

RocketMQ 5.0 解除了 4.x 版本延时消息延迟级别的时间限制,现在生产者可以设置任意延迟时间了,功能上更加强大,实现上也更加复杂。RocketMQ 引入了新的时间轮算法,简单理解就是把时间按照精度划分成 N 个Slot,消息会按照延迟时间加入到对应的 Slot,然后线程定时扫描时间轮,把 Slot 对应的到期消息重新投递即可。新的算法不仅实现更复杂,RocketMQ 还需要额外写 timerwheel 和 timerlog 文件,这两个文件也是要持久化定期刷盘的。

文章来源:https://blog.csdn.net/qq_32099833/article/details/135370834
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。