任务需要延迟一段时间再进行处理。
生产者
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("ip:9876");
producer.start();
List<Order> F = OrderBuilder.build(1, "A", "B", "C");
List<Order> S = OrderBuilder.build(2, "D", "Q");
List<Order> T = OrderBuilder.build(3, "N", "Q", "R");
ArrayList<Order> orders = new ArrayList<Order>() {{
addAll(F);
addAll(S);
addAll(T);
}};
for (Order order : orders) {
Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());
msg.setKeys("test-topic_trace");
// 官网提供了这些延迟级别 分别对应 0 1 2
// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
// 重要的逻辑在这里设置队列延迟等级
msg.setDelayTimeLevel(3);
producer.send(msg);
}
System.out.println("finish");
// 这里发送了两个Tag 的消息
// 下面这个消息没有设置延迟时间
for (Order order : orders) {
Message msg = new Message("test-topic", "test-topic_str_other", ("other" + order.toString()).getBytes());
msg.setKeys("test-topic_trace_other");
producer.send(msg);
}
System.out.println("finish");
}
}
消费者1订阅tag 为*的消息
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
consumer.setNamesrvAddr("ip:9876");
// *表示订阅所有的消息
consumer.subscribe("test-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
可以看到我们没有设置延迟发送和延迟发送的sentTime和recvTime是很有区别的:
我们看到test-group里面的消息总共有16条
消费者订阅tag为test-topic_str_other的消息
consumer.subscribe("test-topic", "test-topic_str_other");
消费者订阅tag为test-topic_str的消息
consumer.subscribe("test-topic", "test-topic_str");
通过上面的案例验证了:
// 后面这个值是根据消息的tag进行正则匹配的
consumer.subscribe("test-topic", "*");
源码的注解也有说明: