在 RocketMQ 中,消息的消费模式包括并发消费和顺序消费,它们分别适用于不同的业务场景。下面是对这两种消费模式的介绍:
特点:
适用场景:
并发消费适用于业务场景不要求消息的处理顺序,可以并行处理的情况。 适用于需要提高消息处理吞吐量的场景。
示例代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 设置消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 处理消息的业务逻辑...
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 订阅主题和标签
consumer.subscribe("YourTopic", "YourTag");
// 启动消费者
consumer.start();
在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。
特点:
适用场景:
顺序消费适用于业务场景要求消息按照一定顺序处理的情况,如订单处理、流程审批等。
因此RocketMQ提供了顺序消费的方式,顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入 MessageListenerOrderly
接口的实现。 示例代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 设置消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 保证同一队列上的消息按照顺序处理
for (MessageExt msg : msgs) {
// 处理消息的业务逻辑...
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 订阅主题和标签
consumer.subscribe("YourTopic", "YourTag");
// 启动消费者
consumer.start();
注意:在顺序消费的情况下,确保同一消息队列上只有一个消费者实例处理消息是非常重要的,否则可能会导致消息处理的混乱。同时,消息的发送方也需要保证消息发送到同一个队列上,以确保顺序性。
持续更新ing,动动小手,点点关注,后续更精彩O