基于Rocket MQ扩展的无限延迟消息队列

发布时间:2023年12月23日

基于Rocket MQ扩展的无限延迟消息队列

背景:

  • Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.

实现原理:

  • 简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.

  • 源码如下:

  • /*
     * Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.
     */
    package com.example.xxx.utils;
     
    import com.vevor.bmp.crm.common.constants.MQConstants;
    import lombok.Data;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
     
    import javax.annotation.Resource;
    import java.io.Serializable;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
     
    /**
     * @version :1.8.0
     * @description :基于Rocket MQ的任意延迟时长工具
     * @program :user-growth
     * @date :Created in 2023/5/22 3:35 下午
     * @since :1.8.0
     */
    @Slf4j
    @Component
    @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,
            topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,
            // 消息消费顺序
            consumeMode = ConsumeMode.CONCURRENTLY,
            // 最大消息重复消费次数
            maxReconsumeTimes = 3)
    public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {
     
        /**
         * Rocket MQ客户端
         */
        @Resource
        private RocketMQTemplate rocketMQTemplate;
     
        /**
         * MQ默认延迟等级
         */
        private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,
                30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,
                480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};
     
        @SneakyThrows
        @Override
        public void onMessage(DelayTable<Object> message) {
            Date endTime = message.getEndTime();
            int delayLevel = getDelayLevel(endTime);
            // 继续延迟
            if (delayLevel != 0) {
                int currentDelayCount = message.getCurrentDelayCount();
                currentDelayCount++;
     
                message.setCurrentDelayCount(currentDelayCount);
                message.setCurrentDelayLevel(delayLevel);
                message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);
                this.sendDelayMessage(message);
                return;
            }
     
            // 执行业务
            log.info("delay message end! start to process business...");
            Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();
            if (messageHandler != null) {
                DelayMessageHandler delayMessageHandler = messageHandler.newInstance();
                delayMessageHandler.handle();
            }
        }
     
        /**
         * 延迟消息体
         *
         * @param <E> 消息类型
         */
        @Data
     
        public static class DelayTable<E> implements Serializable {
     
            private static final long serialVersionUID = 2405172041950251807L;
     
            /**
             * 延迟消息体
             */
            private E content;
     
            /**
             * 消息延迟结束时间
             */
            private Date endTime;
     
            /**
             * 总延迟毫秒数
             */
            private long totalDelayTime;
     
            /**
             * 总延迟时间单位
             */
            private TimeUnit totalDelayTimeUnit;
     
            /**
             * 当前延迟次数
             */
            private int currentDelayCount;
     
            /**
             * 当前延迟等级
             */
            private int currentDelayLevel;
     
            /**
             * 当前延迟毫秒数
             */
            private long currentDelayMillis;
     
            /**
             * 延迟处理逻辑
             */
            private Class<? extends DelayMessageHandler> messageHandler;
        }
     
        /**
         * 发送延迟消息
         *
         * @param message 消息体
         * @param delay 延迟时长
         * @param timeUnit 延迟时间单位
         * @param handler 延迟时间到了之后,需要处理的逻辑
         * @param <E> 延迟消息类型
         */
        public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {
            // 把延迟时间转换成时间戳(毫秒)
            long totalDelayMills = timeUnit.toMillis(delay);
     
            // 根据延迟时间计算结束时间
            Calendar instance = Calendar.getInstance();
            instance.add(Calendar.MILLISECOND, (int)totalDelayMills);
            Date endTime = instance.getTime();
     
            // 根据延迟时间匹配延迟等级(delay level)
            int delayLevel = getDelayLevel(endTime);
            long delayMillis = TIME_DELAY_LEVEL[delayLevel];
     
            // 发送消息
            DelayTable<E> delayTable = new DelayTable<>();
            // 全局数据
            delayTable.setContent(message);
            delayTable.setMessageHandler(handler);
            delayTable.setEndTime(endTime);
            delayTable.setTotalDelayTime(delay);
            delayTable.setTotalDelayTimeUnit(timeUnit);
     
            // 当前延迟等级数据
            delayTable.setCurrentDelayCount(1);
            delayTable.setCurrentDelayLevel(delayLevel);
            delayTable.setCurrentDelayMillis(delayMillis);
            this.sendDelayMessage(delayTable);
        }
     
        /**
         * 计算延迟等级
         *
         * @param targetTime 延迟截止时间
         * @return Rocket MQ延迟消息等级
         */
        private static int getDelayLevel(Date targetTime) {
            long currentTime = System.currentTimeMillis();
            long delayMillis = targetTime.getTime() - currentTime;
     
            if (delayMillis <= 0) {
                // 不延迟,即延迟等级为 0
                return 0;
            }
     
            // 判断处于哪个延迟等级
            // 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h
            for (int i = 1; i <= 18; i++) {
                long delayLevelTime = TIME_DELAY_LEVEL[i];
                if (delayMillis < delayLevelTime) {
                    return i - 1;
                } else if (delayMillis == delayLevelTime) {
                    return i;
                }
            }
     
            // 最大延迟等级为 18
            return 18;
        }
     
        /**
         * 发送延迟消息
         *
         * @param delayTable 延迟对象,可以循环使用
         */
        @SneakyThrows
        private <E> void sendDelayMessage(DelayTable<E> delayTable) {
            // 消息序列化
            Message<DelayTable<E>> message = MessageBuilder
                            .withPayload(delayTable)
                            .build();
     
            // 设置\发送延迟消息
            int delayLevel = delayTable.getCurrentDelayLevel();
            rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message
                    , 3000, delayLevel);
     
            log.debug("delay count: {}, delay level: {}, time: {} milliseconds",
                    delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);
        }
     
        /**
         * 延迟回调接口
         *
         * 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法
         */
        public interface DelayMessageHandler extends Serializable {
            long serialVersionUID = 2405172041950251807L;
     
            /**
             * 回调函数
             */
            void handle();
        }
     
    }
    

测试代码:

  • /*
     * Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.
     */
    package com.vevor.bmp.crm.io.controller;
     
    import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils;
    import com.vevor.common.pojo.vo.ResponseResult;
    import lombok.Data;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
     
    import javax.annotation.Resource;
    import java.util.concurrent.TimeUnit;
     
    /**
     * @version :1.8.0
     * @description :延迟队列测试
     * @program :user-growth
     * @date :Created in 2023/5/22 4:54 下午
     * @since :1.8.0
     */
    @Slf4j
    @RestController
    public class DelayQueueController {
     
        @Resource
        private RocketMQDelayQueueUtils rocketMQDelayQueueUtils;
     
        @GetMapping("/mq/delay")
        @SneakyThrows
        public ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {
            // 获取延时队列
            rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);
            return ResponseResult.success();
        }
     
        /**
         * @version :
         * @description :
         * @program :user-growth
         * @date :Created in 2023/5/23 2:11 下午
         * @since :
         */
        @Data
        public static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {
     
            /**
             * 回调函数
             */
            @Override
            public void handle() {
                log.info("i am business logical! {}", System.currentTimeMillis());
            }
        }
    }
    

优缺点:

  • 优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.
  • 缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.
文章来源:https://blog.csdn.net/Andrew_Chenwq/article/details/135045877
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。