还是先把消费者接收消息的流程图贴出来,再细说代码流程:
首先先从消费者的业务调用出发
// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
// ...
// 注册监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
那么我们就从 consumer.start()
进入,看一下消费者的启动逻辑,该方法的核心代码也就是:
this.defaultMQPushConsumerImpl.start();
那么进入到这个 start 方法,这里进行了一些配置以及客户端的启动:
checkConfig()
检查消费组的一些配置:名称是否符合规范、消费者的线程数、消费者的监听等等mQClientFactory.start()
启动客户端那么我们进入到启动客户端这个逻辑,我们猜测这里 start 之后,可能就可以进行消息的拉取了,那么在 start 这个方法中,看到了有下边这一行:
this.pullMessageService.start();
这不正是拉取消息的服务吗?点进去之后,发现就是启动了一个线程,这个线程呢就是 this
,那么我们点进去这个 start 方法是定义在 ServiceThread
类中,这个类并没有定义 run 方法,因此呢,这个 run 方法应该是定义在了子类 PullMessageService
类中,点进去找到 run 方法,可以看到在 run 方法中就会不停地去 messageRequestQueue
中拉取数据:
MessageRequest messageRequest = this.messageRequestQueue.take();
既然在这里拉取数据了,那么数据是什么时候放到 messageRequestQueue
中的呢?
只需要搜一下哪里调用到了 this.messageRequestQueue.put
就可以知道了,找到之后呢,我们在这一行打个断点,再去启动生产者,就可以知道整个调用链了,
那么根据栈调用情况呢,可以发现这一行是通过 RebalanceService
的 run 方法进入的,那么这个 RebalanceService
一定是在哪里作为一个线程被启动了
那么呢,我们之前说了在启动客户端的时候,调用 this.pullMessageService.start()
启动了这个线程,那么在下一行就启动了 rebalanceService
这个线程:
因此呢,就通过 debug 的方式找到了向 messageRequestQueue
中存放消息就是在 RebalanceService 这个线程中做的