RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

发布时间:2023年12月20日

Broker接收消息的处理流程?

既然要分析 Broker 接收消息,那么如何找到 Broker 接收消息并进行处理的程序入口呢?

那么消息既然是从生产者开始发送,消息是有单条消息和批量消息之分的,那么消息肯定是有一个标识,当 Broker 接收到消息之后,肯定是需要通过判断消息的标识来区分单条消息和批量消息,那么只需要找到发送消息的标识,再全局搜索,就可以找到这个标识在哪里被处理,被处理的地方一定就是 Broker 接收消息处理的位置了!

那么还是先找到发送消息的位置:DefaultMQProducer # send(Message msg) ,通过层层调用(这里在生产者发送消息流程中讲了)到达了 DefaultMQProducerImpl # this.sendKernelImpl()

在这个方法中就调用到了 MQ 客户端的发送消息的方法 this.mQClientFactory.getMQClientAPIImpl().sendMessage()

在这里真正的通过 Netty 去发送消息到 Broker 中去:

  1. 通过判断消息的类型构造一个 RemotiongCommand 类型的 request 参数

    这里有 4 个构造 request 参数的方法,如下图会走到第三个方法中,那么这里的请求标识为 RequestCode.SEND_MESSAGE_V2

    在这里插入图片描述

  2. this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request) 方法中通过 Netty 将消息发送出去,那么这个方法需要传入一个 request 参数

在上边构造了 request 并且通过 Netty 发送出去,request 的标识为 RequestCode.SEND_MESSAGE_V2 ,那么我们只需要找到处理该标识的 request 的位置,那就是 Broker 处理消息的位置,在 IDEA 中通过 Ctrl+Shift+F 全局搜索这个标识即可:

在这里插入图片描述

可以发现有三个进行 case 判断的地方:

  • 第一个在 PlainAccessResource 类中
  • 第二个在 SendMessageActivity 类中
  • 第三个在 SendMessageRequestHeader 类中

这里第三个 case 判断的地方就是 Broker 处理消息的位置(可以在三个 case 中都 debug,看断点走到哪里就知道了)

那么我们就在第三个 case 判断的位置打上断点

在这里插入图片描述

接下来启动 NameServer,再以 Debug 的方式启动 Broker,再启动生产者,根据调用堆栈信息来找到 Broker 处理消息的整个调用链:

在这里插入图片描述

根据这个堆栈信息,可以发现,调用链是从 NettyServerHandler 的 channelRead0 转移过来的,那么也就是再 NettyServerHandler 这个 Netty 的服务端接收到消息并进行处理,那么我们就在这个堆栈信息中找 Broker 是在哪里对消息进行处理了呢?

就是在 SendMessageProcessor # processRequest 方法中(也就是堆栈顶第3个方法),在这个方法中:

  1. 通过 parseRequestHeader(request) 先对请求头进行解码,也就是根据请求头 RequestCode.SEND_MESSAGE_V2 的类型做一些相应的处理
  2. 接下来通过 buildMsgContext(ctx, requestHeader, request) 创建消息的上下文对象
  3. this.executeSendMessageHookBefore(sendMessageContext) 执行一些消息发送前的钩子(扩展点)
  4. 核心:this.sendMessage() 真正去发送消息

那么在 this.sendMessage() 中就是真正发送消息的逻辑了:

  1. 首先是 preSend(ctx, request, requestHeader) 进行预发送,这里其实就是对发送的消息进行一些检查(Topic 是否合法?Topic 是否与系统默认 Topic 冲突?Topic 的一些配置是否存在?等等信息)

  2. 如果 queueIdInt < 0 是 true 的话,表明生产者没有指定要发送到哪个队列,那么就通过 99999999 % 队列个数 来选择一个队列发送

  3. 将超过最大重试次数的消息发送到 DLQ 死信队列中去

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
        return response;
    }
    
  4. 接下来判断 Broker 是否开启了 异步模式,如果开启的话,通过 asyncPutMessage() 处理

    如果没有开启 异步模式,通过 putMessage() 处理,这里其实还是调用了 asyncPutMessage(),只不过通过 get() 阻塞等待结果(复用代码)

那么在发送消息的时候,无论是否异步,都会进入到 DefaultMessageStore # asyncPutMessage() 方法中,我们就点进去看看进行了哪些处理:

  1. 执行一些钩子函数,作为扩展点:putMessageHook.executeBeforePutMessage(msg)

  2. 提交文件的写请求:CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg)

    在这个写文件的方法中,主要做一些文件的写操作,以及将文件写入到磁盘中

    1. 获取文件对象:this.mappedFileQueue.getLastMappedFile()
    2. 追加写文件的操作: mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)
    3. 最后进行刷盘以及高可用的一些处理:handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA)
  3. 打印写文件消耗的时间 this.getSystemClock().now() - beginTime

那么 Broker 总体的接收消息的处理流程就是上边将的这么多了,当然还有一些边边角角的内容没有细说,先了解整体的处理流程,不要提前去学习太多的细节!

文章来源:https://blog.csdn.net/qq_45260619/article/details/135119164
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。