既然要分析 Broker 接收消息,那么如何找到 Broker 接收消息并进行处理的程序入口呢?
那么消息既然是从生产者开始发送,消息是有单条消息和批量消息之分的,那么消息肯定是有一个标识,当 Broker 接收到消息之后,肯定是需要通过判断消息的标识来区分单条消息和批量消息,那么只需要找到发送消息的标识,再全局搜索,就可以找到这个标识在哪里被处理,被处理的地方一定就是 Broker 接收消息处理的位置了!
那么还是先找到发送消息的位置:DefaultMQProducer # send(Message msg)
,通过层层调用(这里在生产者发送消息流程中讲了)到达了 DefaultMQProducerImpl # this.sendKernelImpl()
在这个方法中就调用到了 MQ 客户端的发送消息的方法 this.mQClientFactory.getMQClientAPIImpl().sendMessage()
在这里真正的通过 Netty 去发送消息到 Broker 中去:
通过判断消息的类型构造一个 RemotiongCommand 类型的 request 参数
这里有 4 个构造 request 参数的方法,如下图会走到第三个方法中,那么这里的请求标识为 RequestCode.SEND_MESSAGE_V2
在 this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request)
方法中通过 Netty 将消息发送出去,那么这个方法需要传入一个 request 参数
在上边构造了 request 并且通过 Netty 发送出去,request 的标识为 RequestCode.SEND_MESSAGE_V2
,那么我们只需要找到处理该标识的 request 的位置,那就是 Broker 处理消息的位置,在 IDEA 中通过 Ctrl+Shift+F
全局搜索这个标识即可:
可以发现有三个进行 case 判断的地方:
PlainAccessResource
类中SendMessageActivity
类中SendMessageRequestHeader
类中这里第三个 case 判断的地方就是 Broker 处理消息的位置(可以在三个 case 中都 debug,看断点走到哪里就知道了)
那么我们就在第三个 case 判断的位置打上断点
接下来启动 NameServer,再以 Debug 的方式启动 Broker,再启动生产者,根据调用堆栈信息来找到 Broker 处理消息的整个调用链:
根据这个堆栈信息,可以发现,调用链是从 NettyServerHandler 的 channelRead0 转移过来的,那么也就是再 NettyServerHandler 这个 Netty 的服务端接收到消息并进行处理,那么我们就在这个堆栈信息中找 Broker 是在哪里对消息进行处理了呢?
就是在 SendMessageProcessor # processRequest
方法中(也就是堆栈顶第3个方法),在这个方法中:
parseRequestHeader(request)
先对请求头进行解码,也就是根据请求头 RequestCode.SEND_MESSAGE_V2
的类型做一些相应的处理buildMsgContext(ctx, requestHeader, request)
创建消息的上下文对象this.executeSendMessageHookBefore(sendMessageContext)
执行一些消息发送前的钩子(扩展点)this.sendMessage()
真正去发送消息那么在 this.sendMessage()
中就是真正发送消息的逻辑了:
首先是 preSend(ctx, request, requestHeader)
进行预发送,这里其实就是对发送的消息进行一些检查(Topic 是否合法?Topic 是否与系统默认 Topic 冲突?Topic 的一些配置是否存在?等等信息)
如果 queueIdInt < 0
是 true 的话,表明生产者没有指定要发送到哪个队列,那么就通过 99999999 % 队列个数
来选择一个队列发送
将超过最大重试次数的消息发送到 DLQ 死信队列中去
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
return response;
}
接下来判断 Broker 是否开启了 异步模式
,如果开启的话,通过 asyncPutMessage()
处理
如果没有开启 异步模式
,通过 putMessage()
处理,这里其实还是调用了 asyncPutMessage()
,只不过通过 get()
阻塞等待结果(复用代码)
那么在发送消息的时候,无论是否异步,都会进入到 DefaultMessageStore # asyncPutMessage()
方法中,我们就点进去看看进行了哪些处理:
执行一些钩子函数,作为扩展点:putMessageHook.executeBeforePutMessage(msg)
提交文件的写请求:CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg)
在这个写文件的方法中,主要做一些文件的写操作,以及将文件写入到磁盘中
this.mappedFileQueue.getLastMappedFile()
mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)
handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA)
打印写文件消耗的时间 this.getSystemClock().now() - beginTime
那么 Broker 总体的接收消息的处理流程就是上边将的这么多了,当然还有一些边边角角的内容没有细说,先了解整体的处理流程,不要提前去学习太多的细节!