这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
当前的消费负载均衡策略是以队列的维度来进行,所有行为全部是由客户端主动来完成,主要分为三步:
每个consumer定时去获取消费的topic的队列总数,以及consumer总数
将队列按编号、consumer按ip排序,用统一的分配算法计算该consumer分配哪些消费队列
每个consumer去根据算法分配出来的队列,拉取消息消费
存在的问题:
新方案需要保证两点:
高可用:单一队列的消费能力不受某个消费客户端异常的影响
高性能:POP订阅对消息消费的延迟和吞吐的影响在10%以内
当前pull
模式架构下的消费模型如下
pop的消费模型
举个简单🌰
topic名字: xiaozou
queue数量: 6(brokerA-Q1、brokerA-Q2、brokerB-Q1、brokerB-Q2、brokerC-Q1、brokerC-Q2)
gid: gid_xiaozou
消费者数量: 3(consumer-a、consumer-b、consumer-c)
push消费模式
consumer-a:brokerA-Q1、brokerB-Q1、brokerC-Q1
consumer-b: brokerA-Q2、okerB-Q2、bkerC-Q2
consumer-a:brokerA-Q1、brokerB-Q1、brokerC-Q1、brokerA-Q2、okerB-Q2、bkerC-Q2
consumer-b:brokerA-Q1、brokerB-Q1、brokerC-Q1、brokerA-Q2、okerB-Q2、bkerC-Q2
单个客户端消费所有queue
,避免单个client
假死造成消息堆积
Rebalance
在broker
实现,方便多语言client
实现维护
目前提供了两种方式切换
mqadmin setConsumeMode -c cluster -t topic -g group -m POP -q 8
public class PopPushConsumer {
public static final String CONSUMER_GROUP = "gid_pop_xiao-zou-topic";
public static final String TOPIC = "xiao-zou-topic";
// Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
private static void switchPop() throws Exception {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setNamesrvAddr("http://localhost:9876");
mqAdminExt.start();
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());
for (String brokerAddr : brokerAddrs) {
mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
}
}
public static void main(String[] args) throws Exception {
switchPop();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.subscribe(TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.setClientRebalance(false);
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
pop
消费方式的出现主要是为了解决两个核心问题:
Rebalance
在broker
实现,方便多语言client
实现维护.更符合云原生queue
,避免单个client
假死造成消息堆积consumer
提升消费能力异常consumer
数量比例不均衡时导致的某些consumer
承担过多的消息后续有机会再分析下pop
消费时间的源码以及更多细节