消息队列-RockMQ-定时延时发送消息

发布时间:2024年01月09日

定时延时发送消息

任务需要延迟一段时间再进行处理。
生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        List<Order> F = OrderBuilder.build(1, "A", "B", "C");
        List<Order> S = OrderBuilder.build(2, "D", "Q");
        List<Order> T = OrderBuilder.build(3, "N", "Q", "R");
        ArrayList<Order> orders = new ArrayList<Order>() {{
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        for (Order order : orders) {
            Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());
            msg.setKeys("test-topic_trace");
            // 官网提供了这些延迟级别 分别对应 0 1 2
            // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            // 重要的逻辑在这里设置队列延迟等级
            msg.setDelayTimeLevel(3);
            producer.send(msg);
        }
        System.out.println("finish");
        // 这里发送了两个Tag 的消息
        // 下面这个消息没有设置延迟时间
        for (Order order : orders) {
            Message msg = new Message("test-topic", "test-topic_str_other", ("other" + order.toString()).getBytes());
            msg.setKeys("test-topic_trace_other");
            
            producer.send(msg);
        }
        System.out.println("finish");
    }
}

消费者1订阅tag 为*的消息

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
        consumer.setNamesrvAddr("ip:9876");
        // *表示订阅所有的消息
        consumer.subscribe("test-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

可以看到我们没有设置延迟发送和延迟发送的sentTime和recvTime是很有区别的:
在这里插入图片描述

我们看到test-group里面的消息总共有16条
在这里插入图片描述
消费者订阅tag为test-topic_str_other的消息

consumer.subscribe("test-topic", "test-topic_str_other");

在这里插入图片描述
消费者订阅tag为test-topic_str的消息

consumer.subscribe("test-topic", "test-topic_str");

在这里插入图片描述
通过上面的案例验证了:

// 后面这个值是根据消息的tag进行正则匹配的
consumer.subscribe("test-topic", "*");

源码的注解也有说明:
在这里插入图片描述

文章来源:https://blog.csdn.net/qq_43259860/article/details/135476024
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。