分析 RocketMQ 之前,先确保可以成功启动起来
在 Idea 中配置 ROCKETMQ_HOME
环境变量为自己安装 RocketMQ 的位置即可
ROCKETMQ_HOME=D:\Project\IDEA\Rocket-MQ\rocketmq-all-5.1.4-source-release
启动 Broker 指定 NameServer 地址以及配置文件地址,以及 ROCKETMQ_HOME 变量即可
上边的 custom.conf 配置文件就是 broker.conf
多加了 3 行配置,如下:
既然需要分析 Broker 启动流程,先下载 RocketMQ 源码
https://rocketmq.apache.org/download
Broker 启动的入口为 broker 模块的 BrokerStartup 启动类
:
Broker 启动有两个方法:
createBrokerController(args)
:先创建 BrokerController 控制器,BrokerController 控制器对象包含了各种 Config 配置对象以及 Manager 管理对象
在该方法中,主要通过 buildBrokerController()
方法来创建 BrokerController 控制器
创建 四大配置类
:
解析 命令行参数
:
/**
* 这里解析命令行或 Idea Arguments 的参数
* 如果自己开发插件需要接收命令启动参数的话,可以参考
* Broker 启动命令为:./mqbroker -n localhost:9876 -c D:/RocketMQ/conf/custom.conf autoCreateTopicEnable=true
*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new DefaultParser());
将四大配置类装入 BrokerController:
/**
* 将 4 大配置,装进 BrokerController 中
* 在 BrokerController 构造方法还创建了:
* 1. 各种 Manager 管理对象
* 2. 各种 Processor 处理对象
* 3. 各种 Queue 队列对象
*/
final BrokerController controller = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
创建完 BrokerController 控制器之后,还会判断是否创建成功,以及注册钩子
判断是否创建成功,如果创建失败,就尝试关闭 BrokerController
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
注册 JVM 进程关闭的钩子,在进程关闭时,回收一些资源
// 添加 JVM 钩子,在 JVM 关闭时,会触发钩子,做一些回收动作
Runtime.getRuntime().addShutdownHook(new Thread(buildShutdownHook(controller)));
start(brokerController)
:创建完 BrokerController 后,启动 BrokerController
在该方法中,通过 controller.start();
来启动 BrokerController
调用 NameServer 的通信组件启动
// 调用 NameServer 的通信组件启动
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
向所有的 NameServer 注册 Broker 自己
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
发送心跳
if (this.brokerConfig.isEnableControllerMode()) {
scheduleSendHeartbeat();
}