RocketMQ中的消费者组扮演着关键角色,主要有两种使用场景。首先,一个topic只被一个消费者组订阅,确保每条消息都被处理,同时通过负载均衡提高处理效率和可靠性。其次,多个消费者组可以独立完整地消费同一个topic,支持不同的消费逻辑和业务处理流程。
一个topic只被一个消费者组订阅,那么该消费者组都将完整的消费该topic中的消息,也就是这个消费者组中的每个consumer将近乎均等的消费topic中的消息,如下图所示:
在RocketMQ中,当一个topic只被一个消费者组订阅时,该消费者组将承担起完整消费该topic中所有消息的责任,这意味着,无论该topic中有多少条消息,这个消费者组都会确保每条消息都被至少一个消费者处理。
为了实现这种完整的消息消费,RocketMQ会在消费者组内的消费者之间进行消息的负载均衡,负载均衡是自动完成的,它根据消费者的数量和消费者的消费能力来动态分配消息,负载均衡算法确保每个消费者都能近乎均等地分配到一部分消息,从而避免某个消费者过载或某个消费者空闲的情况。
这里的“近乎均等”并不意味着每个消费者消费的消息数量完全相同,实际上,由于消息的产生速率、消费者的处理速度以及系统资源分配等因素的影响,每个消费者消费的消息数量可能会有所差异,但是,RocketMQ的负载均衡机制会尽量保证每个消费者都有机会消费到消息,并且消费的消息数量相对均衡。
因此,可以放心地使用RocketMQ的消费者组和负载均衡机制,来确保该topic中的所有消息都能被完整地消费掉。这种设计不仅提高了消息处理的效率,还保证了消息处理的可靠性,这是RocketMQ作为高性能消息中间件的重要特性之一。
一个topic被多个消费者组订阅,那么每一个消费者组都将完整的消费该topic中的消息,也就是说每个消费者组内的consumer将均分的消费该topic中的消息,如下图所示:
当一个topic被多个消费者组订阅时,每个消费者组都将独立且完整地消费该topic中的所有消息,这种设计确保了消息的充分利用,并且支持多种不同的消费逻辑或业务处理流程。
每个消费者组内的consumer会均分地消费该topic中的消息,RocketMQ会根据消费者组内的消费者数量和消费者的消费能力,自动进行消息的负载均衡,负载均衡算法会确保每个消费者都能获得相对均等的消息量,从而避免单个消费者过载或空闲的情况。
每个消费者组订阅同一个topic时,它们之间是相互独立的,不同消费者组中的consumer可以并行地消费消息,彼此之间不会相互干扰*,这种设计使得RocketMQ能够支持多种不同的消费模式,例如广播消费和集群消费,从而满足不同的业务需求。*
先创建一个producer,如下ProducerNormalMessageExample代码:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @创建人 程序员古德 <br>
* @创建时间 2024/1/17 10:00 <br>
* @修改人 暂无 <br>
* @修改时间 暂无 <br>
* @版本历史 暂无 <br>
*/
public class ProducerNormalMessageExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerNormalMessageExample.class);
private static final String ENDPOINTS = "192.168.109.109:9876";
private ProducerNormalMessageExample() {}
public static void main(String[] args) throws Exception {
// 1. 创建一个消息生产者producer,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 2. 指定Nameserver地址
producer.setNamesrvAddr("192.168.109.109:9876");
// 3. 启动producer
producer.start();
for (int i = 0; i < 10; i++) {
// 4. 创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ " + i).getBytes());
// 5. 发送消息
SendResult sendResult = producer.send(msg);
// 6. 打印发送结果
System.out.println(sendResult);
}
// 7. 关闭生产者producer
producer.shutdown();
}
}
实现第一种情况,创建两个consumer,分别为ConsumerExample_C1和ConsumerExample_C2,并且设置它们的groupName都为group1,如下代码:
ConsumerExample_C1,如下代码:
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* @创建人 程序员古德 <br>
* @创建时间 2024/1/17 10:00 <br>
* @修改人 暂无 <br>
* @修改时间 暂无 <br>
* @版本历史 暂无 <br>
*/
public class ConsumerExample_C1 {
private static String groupName = "group1";
public static void main(String[] args) throws Exception {
// 1. 创建一个消息消费者consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
// 2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.109.109:9876");
// 3. 订阅主题Topic和Tag来过滤需要消费的消息
consumer.subscribe("TestTopic", "*");
// 4. 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 打印消息内容
System.out.printf("%s,,,%s Receive New Messages: %s %n", groupName,Thread.currentThread().getName(), new String(msg.getBody()));
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5. 启动消费者consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
ConsumerExample_C2,如下代码:
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* @创建人 程序员古德 <br>
* @创建时间 2024/1/17 10:00 <br>
* @修改人 暂无 <br>
* @修改时间 暂无 <br>
* @版本历史 暂无 <br>
*/
public class ConsumerExample_C2 {
private static String groupName = "group1";
public static void main(String[] args) throws Exception {
// 下面代码同 ConsumerExample_C1
}
}
实现第二种情况,需要在第一种情况的基础上在加上ConsumerExample_C3和ConsumerExample_C4。
ConsumerExample_C3,设置groupName为group2,如下代码:
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* @创建人 程序员古德 <br>
* @创建时间 2024/1/17 10:00 <br>
* @修改人 暂无 <br>
* @修改时间 暂无 <br>
* @版本历史 暂无 <br>
*/
public class ConsumerExample_C3 {
private static String groupName = "group2";
public static void main(String[] args) throws Exception {
// 下面代码同 ConsumerExample_C1
}
}
ConsumerExample_C4,设置groupName为group3,如下代码:
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* @创建人 程序员古德 <br>
* @创建时间 2024/1/17 10:00 <br>
* @修改人 暂无 <br>
* @修改时间 暂无 <br>
* @版本历史 暂无 <br>
*/
public class ConsumerExample_C4 {
private static String groupName = "group2";
public static void main(String[] args) throws Exception {
// 下面代码同 ConsumerExample_C1
}
}
消费者组中的每个consumer将近乎均等的消费topic中的消息,如下图所示:
当采用集群消费的消费模式时,消费者组中的每个consumer将按照近乎均等的原则来消费topic中的消息,这种消费模式确保了消息能够在多个consumer之间实现负载均衡,从而提高消息处理的并行性和吞吐量。
消费者组中的每个consumer将完完整整的消费topic中的消息,如下图所示:
在广播消费模式下,RocketMQ会将topic中的每条消息都发送给消费者组中的每个consumer,因此,无论消费者组中有多少个consumer,每个consumer都会收到相同的消息集合,这种机制确保了消息的全面覆盖,使得每个consumer都能够对消息进行独立处理。
广播消费场景通常用在状态同步和更新场景,比如:当系统中某个组件的状态发生变化时,可能需要通知其他所有相关的组件进行相应的调整或处理。例如,在微服务架构中,一个服务实例的上下线可能需要通知服务注册中心和所有依赖该服务的其他实例,广播消费可以高效地完成这种状态通知的传递。
由于每个consumer都会消费topic中的所有消息,所以在消费者数量较多的情况下,可能会存在消息处理的重复性和资源浪费的问题。因此,在使用广播消费模式时,需要权衡消息处理效率和系统性能之间的关系。
设置广播消费或者集群消费需要在Consumer端设置,具体代码为consumer.setMessageModel(MessageModel.BROADCASTING);
,如下代码:
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* @创建人 程序员古德 <br>
* @创建时间 2024/1/17 10:00 <br>
* @修改人 暂无 <br>
* @修改时间 暂无 <br>
* @版本历史 暂无 <br>
*/
public class ConsumerExample_C1 {
private static String groupName = "group1";
public static void main(String[] args) throws Exception {
// 1. 创建一个消息消费者consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
// 2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.109.109:9876");
// 3. 订阅主题Topic和Tag来过滤需要消费的消息
consumer.subscribe("TestTopic", "*");
// 4. 设置消费者消费消息的位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 5. 设置消费者模式,集群模式或者广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 6. 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 打印消息内容
System.out.printf("%s,,,%s Receive New Messages: %s %n", groupName,Thread.currentThread().getName(), new String(msg.getBody()));
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5. 启动消费者consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
END!