在 Broker 启动时,通过 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
向 NameServer 中注册自己
那么 NameServer 中,注册 Broker 信息的入口在: DefaultRequestProcessor # processRequest
判断请求码,如果是 Broker 注册,则进行注册 Broker 信息
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
// ... 省略
// 如果是 Broker 注册
case RequestCode.REGISTER_BROKER:
return this.registerBroker(ctx, request);
// ... 省略
}
}
?
this.registerBroker
真正开始注册 Broker 信息
在注册信息之前,会先使用 crc32
来检验消息的正确性(安全检查)
之后会调用 this.namesrvController.getRouteInfoManager().registerBroker()
来注册 Broker 的信息,这个 Broker 的信息是 BrokerController 启动时通过 Netty 发送过来的
通过 getRouteInfoManager
获取 RouteInfoManager,在该类中注册 Broker 信息,那么 RouteInfoManager 肯定是管理了 Broker 的信息
可以点进去 RouteInfoManager,可以发现其中管理了很多路由的信息
?
其中 brokerLiveTable 存储的是存活的 Broker 列表,那么可以查看该变量的引用链,来判断 Nameserver 在哪里进行心跳扫描
可以看到在 scanNotActiveBroker 方法中,会将 brokerLiveTable 中不活跃的 Broker 给剔除掉
?
下面会将整体的一个发送消息的流程图片先展示出来,再通过代码进行一步一步梳理:
既然要看生产者的发送消息流程,就先通过方法的调用作为入口,一步一步探究流程:
那么通过这个 send 方法点进去,入口为:DefaultMQProducer # send(Message msg)
方法,从该方法点击进入,调用链如下:
如果你在看源码的话,可以从上边的调用链一步一步点击,最后发送消息的逻辑就在 this.sendDefaultImpl
方法中展开
重试次数
,同步情况下重试次数为预设次数 +1,异步情况下默认重试次数为 1重试次数
循环发送消息,为 Topic 选择要发送的队列 MessageQueue 进行消息发送选择队列之后,就进入到发送消息的核心逻辑:this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
禁止发送
或 前置拦截
的钩子函数,进行一些消息的拦截处理SYNC
为例如果是同步的话,会通过 this.mQClientFactory.getMQClientAPIImpl().sendMessage()
方法将消息发送出去,接下来又是层层的调用,最后真正通过 Netty 将消息发送出去的地方在 NettyRemotingClient # invokeSync()
的方法中
在这个方法中,还会对消息进行前置拦截和后置拦截,为开发者的使用提供了很多的扩展点,在这里就 真正通过 Netty 将消息发送出去了