RocketMQ是由阿里巴巴开发的一款分布式消息中间件系统。它以高吞吐量、低延迟和可扩展性为特点,可以提供可靠的消息传递和分布式通信能力。
RocketMQ是基于发布/订阅模式的消息中间件,支持多种消息模型,包括点对点和广播。它由四个核心组件组成:
RocketMQ还拥有一些高级特性,例如消息的可靠性投递、消息的顺序性、消息的事务性等。借助这些特性,RocketMQ可以满足各种场景下的消息传递需求,例如电商、金融、物流等。
延迟队列说白了就是延迟的消息处理。
RocketMQ的延迟队列是通过消息的延迟级别来实现的。延迟级别是指消息从发送到被消费之间的时间间隔,可以设置为任意时间段,单位为毫秒。
在RocketMQ中,使用MessageDelayLevel
构成了延迟级别,它是一个枚举类型,定义了多个级别,例如MessageDelayLevel.TIME_DELAY_LEVEL_1
、MessageDelayLevel.TIME_DELAY_LEVEL_2
等。每个级别都对应一个延迟时间。
Java示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息
Message message = new Message(
"topic_name",
"Hello, RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延迟级别
message.setDelayTimeLevel(2);
// 发送消息
SendResult sendResult = producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据业务需求选择消息队列
return mqs.get(0);
}
},
null);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
首先创建了一个生产者实例,设置了NameServer的地址。然后创建消息,并设置了延迟级别为2。最后通过生产者发送消息,同时传入一个MessageQueueSelector
实例,用于指定消息发送到哪个消息队列。
通过这种方式,可以实现消息的延迟投递,即消息将在指定的延迟时间后才会被消费者消费。在消费者端,可以监听指定的延迟级别来消费延迟消息。
请注意,延迟队列只对那些配置了延迟级别的消息生效。如果消息没有设置延迟级别,将会按照正常的消息流程进行处理。