RokcetMQ源码解析12-延迟消息

发布时间:2024年01月11日

1.生产者发送延迟消息

public class Producer {

   public static void main(String[] args) throws Exception {
      // 实例化一个生产者来产生延时消息
      DefaultMQProducer producer = new DefaultMQProducer("DELAY_P_G");
      producer.setNamesrvAddr("127.0.0.1:9876");
      // 启动生产者
      producer.start();
      for (int i = 0; i < 1; i++) {
          Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
          /**
           * MessageStoreConfig
           * messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
           *
           * 共18个等级,依次是从1-18
           * 比如,level=3, 表示延迟10s 消费
           */
          message.setDelayTimeLevel(4);

          // 发送消息
          SendResult send = producer.send(message);
          System.out.println("send = " + send);
      }
       // 关闭生产者
      producer.shutdown();
  }
}

发送一条延迟消息,只需要在生产者代码中调用Message对象的setDelayTimeLevel(int level)方法设置一个延迟等级,最终延迟等级会putpropertiesmap集合中。

那么延迟到底是什么时候触发的呢?这里先保留问题接着往下分析

?1.1延迟等级对应的延迟时间

2.存储延迟消息?

其实延迟消息和普通消息并没有多大的差异,只不过broker在存储消息时,会判断消息的延迟属性是否为空,如果不为空,则判断是延迟消息,并且会将原始topic替换成SCHEDULE_TOPIC_XXXX,那么我们就看下broker存储时判断是否为延迟消息的逻辑:

CommitLog#asyncPutMessage(..)

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // Set the storage time 设置存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        //设置消息体CRC
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;
        //获取消息存储对象
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
        // 获取事务状态
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //判断事务状态
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 延时消息的处理
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 设置延迟队列
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //将queueid替换为(延迟级别-1)
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId 备份信息
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        ......
            return putMessageResult;
        });
    }

其实它要做的事情很简单,简单总结下:

  1. 将原始topic替换为延迟消息固定的topic:SCHEDULE_TOPIC_XXXX(所有的延时消息共用这一个topic)
  2. 将原始queueid替换为(延迟级别-1)
  3. 备份原始topic/queueid, 保存到原始消息的properties属性中

3.延迟消息的投递

上面broker将延迟消息写到了commitlog中,由于broker替换了我们的原始topic,所以订阅该topic的消费者此时还无法消费该消息,只有当时间到了消费者才可以消费,那么我们就看下broker是如何处理的。 首先处理延迟消息的是ScheduleMessageService

public class ScheduleMessageService extends ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final long FIRST_DELAY_TIME = 1000L;
    private static final long DELAY_FOR_A_WHILE = 100L;
    private static final long DELAY_FOR_A_PERIOD = 10000L;
    
    //TODO:broker启动时会初始化这个Map,key是延迟等级,共计18个,value就是延迟等级对应的时间
    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);

    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
        new ConcurrentHashMap<Integer, Long>(32);
    

    //TODO:省略其他属性和方法
    

    //TODO:broker启动时,会调用该方法
    public void start() {
        if (started.compareAndSet(false, true)) {
            super.load();
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }

                if (timeDelay != null) {
                    //TODO:处理延迟消息
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }

            this.timer.scheduleAtFixedRate(new TimerTask() {

                @Override
                public void run() {
                    try {
                        //TODO:持久化
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }

   
   //TODO:省略其他方法
}

关注的地方主要就是2个,一个是处理延迟消息,一个是持久化,那么我们就分别看下:

3.1处理延迟消息?

Broker中同一等级的所有延时消息会被写入到consumequeue?目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。

处理延迟消息核心代码在DeliverDelayedMessageTimerTask#executeOnTimeup()中,直接看

public void executeOnTimeup() {
    //TODO:根据延迟topic和延迟queueid 去获取Consumequeue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));

    long failScheduleOffset = offset;

    if (cq != null) {
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                //TODO:offset用来标记队列读取到哪里了
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    //TODO:....省略部分代码.....

                    long now = System.currentTimeMillis();
                    //TODO:计算投递时间,时间存储在了tag hashcode 中了
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    long countdown = deliverTimestamp - now;

                    //TODO:投递时间到了
                    if (countdown <= 0) {
                        //TODO:去broker中将消息读取出来
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                //TODO:构建新的消息体,将原来的消息信息设置到这里,并将topic和queueid设置为原始的topic和queueid(前面备份过)
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                    log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                            msgInner.getTopic(), msgInner);
                                    continue;
                                }
                                
                                //TODO:将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.writeMessageStore
                                        .putMessage(msgInner);

                                
                              //TODO:....省略部分代码......  
                                
                            } catch (Exception e) {
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {

            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)

    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

?我们简单对代码总结下:

1.根据延迟topic和延迟queueid获取consumequeue,并从队列中读取索引单元

2.计算并判断消息的投递时间

3.如果投递时间到了

3.1 则根据索引单元中的commitlog offset 和 msg size 将该条消息A从commitlog中读取出 来.
3.2 将读取出来的消息属性复制到一个新的消息对象体B中,将A中备份的原始topic, queueid 读取 出来重新设置到B中,并清除延迟属性,使其成为一条普通消息.
3.3 调用CommitLog#putMessage(msg)方法,再次将消息B写入到commitlog中。这样消费者就可以消费到订阅了该topic的消息。

4.如果投递时间没到

4.1 计算剩余投递时间countdown(投递时间-当前时间), 然后开启一个JDK的Timer延迟任务,延迟时间就是countdown,继续执行DeliverDelayedMessageTimerTask的逻辑。

4.持久化

持久化就是通过定时任务,每隔10s将延迟队列的消费进度offset写到文件中。

文件默认路径:$user.home/store/config/delayOffset.json

5.总结?

  1. 发送消息时,通过setDelayTimeLevel(int level) 来设置延迟等级,RocketMQ默认支持18种延迟等级,每个延迟等级对应不同的延迟时间

  2. 所有延迟消息共用一个topic: SCHEDULE_TOPIC_XXXX

  3. 相同延迟等级的消息会放到同一个队列中(queueid=delayLevel - 1)

  4. 相同等级的消息会被同一个线程去处理

文章来源:https://blog.csdn.net/qq_38038507/article/details/135528988
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。