万字详解RocketMq集群搭建步骤

发布时间:2023年12月17日

1.配置三台主机的IP与主机名的映射

三台机器都使用vim /etc/hosts将上边的ip与主机名称放到hosts文件里边。
cat /etc/hosts可以显示hosts文件里边的内容

192.168.150.139  mq1
192.168.150.137  mq2
192.168.150.138  mq3

2. 安装JDK 1.8和RocketMQ

在三台机器/usr/local/目录下mkdir /jdk创建放置压缩包的目录

wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/8u141-b15/336fa29ff2bb4ef291e347e091f7f4a7/jdk-8u141-linux-x64.tar.gz"

得到jdk-8u141-linux-x64.tar.gz压缩文件

tar xzf jdk-8u141-linux-x64.tar.gz

得到jdk1.8.0_141文件目录

在三台机器/usr/local/目录下mkdir /rocketmq创建放置压缩包的目录。

wget https://archive.apache.org/dist/rocketmq/4.9.1/rocketmq-all-4.9.1-bin-release.zip

unzip /rocketmq/rocketmq-all-4.9.1-bin-release.zip解压文件到rocketmq目录。

3.配置JDK和RocketMQ环境变量

打开/etc/profile文件

vim /etc/profile

在文件末尾加上下边的内容:

JAVA_HOME=/usr/local/jdk/jdk1.8.0_141
JRE_HOME=/usr/local/jdk/jdk1.8.0_141/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.9.1-bin-release
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$ROCKETMQ_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH ROCKETMQ_HOME
export NAMESRV_ADDR='mq1:9876;mq2:9876;mq3:9876'

4.集群部署模式

1.单 master 模式
只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用。
2.多 master 模式
多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。
优点:所有模式中性能最高(一个Topic的可以分布在不同的master,进行横向拓展)在多主多从的架构体系下,无论使用客户端还是管理界面创建主题,一个主题都会创建多份队列在多主中(默认是4个的话,双主就会有8个队列,每台主4个队列,所以双主可以提高性能,一个Topic的分布在不同的master,方便进行横向拓展。
缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
3. 多master 多 slave 异步复制模式
而从节点(Slave)就是复制主节点的数据,对于生产者完全感知不到,对于消费者正常情况下也感知不到。(只有当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。)
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master节点可读可写,但是 slave只能读不能写,类似于 mysql 的主备模式。
优点: 一般情况下都是master消费,在 master 宕机或超过负载时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。(Master宕机后,生产者发送的消息没有消费完,同时到Slave节点的数据也没有同步完)
4. 多master多 slave主从同步复制+异步刷盘
优点:主从同步复制模式能保证数据不丢失。
缺点:发送单个消息响应时间会略长,性能相比异步复制低10%左右。
对数据要求较高的场景,主从同步复制方式,保存数据热备份,通过异步刷盘方式,保证rocketMQ高吞吐量。
5. Dlegder
在RocketMQ4.5版本之后推出了Dlegder模式,但是这种模式一直存在严重BUG,同时性能有可能有问题,包括升级到了4.8的版本后也一样(类似于Zookeeper的集群选举模式)

5.刷盘模式

1. SYNC_FLUSH(同步刷盘):生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问
题,但是有很大的磁盘IO开销,性能有一定影响。
异步刷盘
2. ASYNC_FLUSH(异步刷盘):生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行刷盘。这种异步的方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式

6.开始搭建双主双从异步复制+异步刷盘

集群规划情况如下

机器名nameServer部署broker部署
mq1nameServerbroker-a和broker-b-s
mq2nameServerbroker-b和broker-a-s
mq3nameServer

配置RocketMQ启动文件和配置文件

  1. 配置RocketMQ启动文件和配置文件(机器内存不够修改此配置可以正常启动)
    在三台机器修改runserver.sh和runbroker.sh的启动内存
    vim /rocketmq/rocketmq-all-4.9.1-bin-release/bin/runserver.sh,把NameServer启动内存从默认的4G调整为512M。
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vim /rocketmq/rocketmq-all-4.9.1-bin-release/bin/runbroker.sh把Broker的8G默认预设内存调整为如下:

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

第一个节点配置
vim /rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-a.properties然后把下边的RocketMQ启动文件内容配置进去。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
brokerIP1=mq1
brokerIP2=mq1
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128


vim /rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-b-s.properties,把下边的内容配置进去。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
brokerIP1=mq1
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=100
#nameServer地址,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

第二个节点配置
vim /rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-b.properties,把下边的内容配置进去。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
brokerIP1=mq2
brokerIP2=mq2
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

vim /rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-a-s.properties,把下边的内容配置进去。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#暴露外网的IP
brokerIP1=mq2
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=100
#nameServer地址,分号分割
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
这里对几个需要重点关注的属性,做下简单介绍:
brokerClusterName: 集群名。RocketMQ会将同一个局域网下所有brokerClusterName相同的服务自动组成一个集群,这个集群可以作为一个整体对外提供服务

brokerName: Broker服务名。同一个RocketMQ集群当中,brokerName相同的多个服务会有一套相同的数据副本。同一个RocketMQ集群中,是可以将消息分散存储到多个不同的brokerName服务上的。

brokerId: RocketMQ中对每个服务的唯一标识。RocketMQ对brokerId定义了一套简单的规则,master节点需要固定配置为0,负责响应客户端的请求。slave节点配置成其他任意数字,负责备份master上的消息。

brokerRole: 服务的角色。这个属性有三个可选项:ASYNC_MASTER,SYNC_MASTER和SLAVE。其中,ASYNC_MASTER和SYNC_MASTER表示当前节点是master节点,目前暂时不用关心他们的区别。SLAVE则表示从节点。

namesrvAddr: nameserver服务的地址。nameserver服务默认占用9876端口。多个nameserver地址用;隔开

启动
先启动三个节点的nameserver,在三个节点分别执行
nohup mqnamesrv &
去/root/logs/rocketmqlogs目录下查看namesrv.log输出如下内容:

2023-12-14 15:10:57 INFO main - The Name Server boot success. serializeType=JSON

启动mq1和mq2节点的broker,先启动主节点再启动从节点。
启动broker-a的主从节点:

1.执行 nohup mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-a.properties &启动mq1的broker-a节点
去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:

2023-12-14 15:25:58 INFO main - The broker[broker-a, mq1:10911] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876

  1. 执行nohup mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-a-s.properties &启动mq2的broker-a-s节点,去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:
2023-12-14 15:28:46 INFO main - The broker[broker-a, mq2:11011] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876

启动broker-b的主从节点:
2. 执行nohup mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-b.properties &启动mq2的broker-b节点,去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:

2023-12-14 15:31:55 INFO main - The broker[broker-b, mq2:10911] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876

2.执行nohup mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.1-bin-release/conf/2m-2s-async/broker-b-s.properties &启动mq1的broker-b-s节点,去/root/logs/rocketmqlogs目录下查看broker.log输出如下内容:

2023-12-14 15:34:07 INFO main - The broker[broker-b, mq1:11011] boot success. serializeType=JSON and name server is mq1:9876;mq2:9876;mq3:9876

两主两从启动需要新建以下文件目录:

/rocketmq/store
/rocketmq/store/commitlog
/rocketmq/store/consumequeue
/rocketmq/store/index
/rocketmq/store/checkpoint
/rocketmq/store/abort

/rocketmq/storeSlave
/rocketmq/storeSlave/commitlog
/rocketmq/storeSlave/consumequeue
/rocketmq/storeSlave/index
/rocketmq/storeSlave/checkpoint
/rocketmq/storeSlave/abort

在mq1,mq2,mq3分别执行jps看启动的节点:

[root@localhost rocketmqlogs]# jps
2960 NamesrvStartup
3010 BrokerStartup
3239 Jps
3161 BrokerStartup

[root@mq2 logs]# jps
3908 BrokerStartup
4036 Jps
3765 NamesrvStartup
3822 BrokerStartup

[root@mq3 logs]# jps
3099 NamesrvStartup
3182 Jps

执行 mqadmin clusterlist 查看集群状态:

[root@mq2 logs]# mqadmin clusterlist
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
rocketmq-cluster  broker-a                0     mq1:10911              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 50.07 0.1270
rocketmq-cluster  broker-a                100   mq2:11011              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 50.07 0.1603
rocketmq-cluster  broker-b                0     mq2:10911              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 49.97 0.1603
rocketmq-cluster  broker-b                100   mq1:11011              V4_9_1                   0.00(0,0ms)         0.00(0,0ms)          0 49.97 0.1270

进行启动后的测试
在第一个节点上使用示例代码tools.sh org.apache.rocketmq.example.quickstart.Producer使用发送消息。
消息发送成功会显示如下内容:

SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651356A03DC, offsetMsgId=C0A8968C00002A9F0000000000104D42, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1373]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651356B03DD, offsetMsgId=C0A8968900002A9F00000000000D5008, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=1124]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651356D03DE, offsetMsgId=C0A8968900002A9F00000000000D50CA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=1123]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357003DF, offsetMsgId=C0A8968900002A9F00000000000D518C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=2], queueOffset=1123]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357203E0, offsetMsgId=C0A8968900002A9F00000000000D524E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=3], queueOffset=1124]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357403E1, offsetMsgId=C0A8968C00002A9F0000000000104E04, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1374]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357603E2, offsetMsgId=C0A8968C00002A9F0000000000104EC6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1375]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357803E3, offsetMsgId=C0A8968C00002A9F0000000000104F88, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1374]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357903E4, offsetMsgId=C0A8968C00002A9F000000000010504A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1374]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357B03E5, offsetMsgId=C0A8968900002A9F00000000000D5310, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=1125]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651357E03E6, offsetMsgId=C0A8968900002A9F00000000000D53D2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=1124]
SendResult [sendStatus=SEND_OK, msgId=7F0000010CF91B6D35864651358003E7, offsetMsgId=C0A8968900002A9F00000000000D5494, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=2], queueOffset=1124]
15:42:07.273 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.140:11011] result: true
15:42:07.280 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.137:10911] result: true
15:42:07.280 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.140:10911] result: true
15:42:07.281 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.137:11011] result: true
15:42:07.281 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.150.138:9876] result: true

按下Ctrl+C按键,可以停止。

在第二个节点执行tools.sh org.apache.rocketmq.example.quickstart.Consumer显示下边类似的内容就是成功:

ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=194, queueOffset=1372, sysFlag=0, bornTimestamp=1702539727192, bornHost=/192.168.150.140:39784, storeTimestamp=1702539727192, storeHost=/192.168.150.140:10911, msgId=C0A8968C00002A9F0000000000104A3A, commitLogOffset=1067578, bodyCRC=1187919614, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1375, CONSUME_START_TIME=1702539742283, UNIQ_KEY=7F0000010CF91B6D35864651355803D4, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 48], transactionId='null'}]] 
ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1123, sysFlag=0, bornTimestamp=1702539727193, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727176, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D4D00, commitLogOffset=871680, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651355903D5, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]] 
ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1121, sysFlag=0, bornTimestamp=1702539727161, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727144, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D46F0, commitLogOffset=870128, bodyCRC=673705983, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651353903C5, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 54, 53], transactionId='null'}]] 
ConsumeMessageThread_15 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1119, sysFlag=0, bornTimestamp=1702539727132, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727114, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D40E0, commitLogOffset=868576, bodyCRC=329761110, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651351C03B5, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 57], transactionId='null'}]] 
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1118, sysFlag=0, bornTimestamp=1702539727117, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727100, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D3DD8, commitLogOffset=867800, bodyCRC=494684516, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742477, UNIQ_KEY=7F0000010CF91B6D35864651350D03AD, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 49], transactionId='null'}]] 
ConsumeMessageThread_9 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1108, sysFlag=0, bornTimestamp=1702539726944, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726927, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D1F88, commitLogOffset=860040, bodyCRC=780681681, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742476, UNIQ_KEY=7F0000010CF91B6D358646513460035D, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 54, 49], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1104, sysFlag=0, bornTimestamp=1702539726875, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726858, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D1368, commitLogOffset=856936, bodyCRC=1144982759, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742476, UNIQ_KEY=7F0000010CF91B6D35864651341B033D, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 50, 57], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1101, sysFlag=0, bornTimestamp=1702539726815, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726798, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D0A50, commitLogOffset=854608, bodyCRC=2143232590, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742476, UNIQ_KEY=7F0000010CF91B6D3586465133DF0325, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 48, 53], transactionId='null'}]] 
ConsumeMessageThread_18 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1090, sysFlag=0, bornTimestamp=1702539726540, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726523, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000CE8F8, commitLogOffset=846072, bodyCRC=66467102, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742475, UNIQ_KEY=7F0000010CF91B6D3586465132CC02CD, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 49, 55], transactionId='null'}]] 
ConsumeMessageThread_12 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=0, storeSize=194, queueOffset=1078, sysFlag=0, bornTimestamp=1702539726293, bornHost=/192.168.150.140:47668, storeTimestamp=1702539726277, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000CC498, commitLogOffset=836760, bodyCRC=1081780703, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1126, CONSUME_START_TIME=1702539742475, UNIQ_KEY=7F0000010CF91B6D3586465131D5026D, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50, 49], transactionId='null'}]] 
ConsumeMessageThread_17 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=1, storeSize=194, queueOffset=1118, sysFlag=0, bornTimestamp=1702539727133, bornHost=/192.168.150.140:47668, storeTimestamp=1702539727117, storeHost=/192.168.150.137:10911, msgId=C0A8968900002A9F00000000000D41A2, commitLogOffset=868770, bodyCRC=1935689907, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1125, CONSUME_START_TIME=1702539742472, UNIQ_KEY=7F0000010CF91B6D35864651351D03B6, CLUSTER=rocketmq-cluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 53, 48], transactionId='null'}]] 

启动RocketMQ Dashboard

RocketMQ Dashboard下载地址
修改rocketmq-dashboard-1.0.0-source-release\rocketmq-dashboard-1.0.0\src\main\resources\application.properties内容如下:

rocketmq.config.namesrvAddr=192.168.150.140:9876;192.168.150.138:9876;192.168.150.137:9876

maven打可执行jar包:rocketmq-dashboard-1.0.0.jar,上传到服务器,mq3节点。
启动jar:

nohup java -jar rocketmq-dashboard-1.0.0.jar > dashboard.log 2>&1 &

查看dashboard.log日志文件输出:

[2023-12-14 15:53:29.543]  INFO Starting Servlet engine: [Apache Tomcat/9.0.29]
[2023-12-14 15:53:29.643]  INFO Initializing Spring embedded WebApplicationContext
[2023-12-14 15:53:29.644]  INFO Root WebApplicationContext: initialization completed in 2460 ms
[2023-12-14 15:53:30.615]  INFO Initializing ExecutorService 'applicationTaskExecutor'
[2023-12-14 15:53:30.736]  INFO Adding welcome page: class path resource [static/index.html]
[2023-12-14 15:53:30.951]  INFO Initializing ExecutorService 'taskScheduler'
[2023-12-14 15:53:30.968]  INFO Exposing 2 endpoint(s) beneath base path '/actuator'
[2023-12-14 15:53:31.032]  INFO Starting ProtocolHandler ["http-nio-0.0.0.0-8080"]
[2023-12-14 15:53:31.095]  INFO Tomcat started on port(s): 8080 (http) with context path ''
[2023-12-14 15:53:31.098]  INFO Started App in 4.551 seconds (JVM running for 5.294)
[2023-12-14 15:53:52.145]  INFO Initializing Spring DispatcherServlet 'dispatcherServlet'
[2023-12-14 15:53:52.146]  INFO Initializing Servlet 'dispatcherServlet'
[2023-12-14 15:53:52.168]  INFO Completed initialization in 21 ms

访问:管理工具
在这里插入图片描述

关闭服务

先停止broker在停止nameserver

mqshutdown broker
mqshutdown namesrv

通过控制台看出集群全部关闭
在这里插入图片描述

启动Dlegder模式

在Dledger集群中,就不再单独指定各个broker的服务,而是由这些broker服务自行进行选举,产生一个Leader角色的服务,响应客户端的各种请求。而其他的broker服务,就作为Follower角色,负责对Leader上的数据进行备份。当然,Follower所要负责的事情,比主从架构中的SLAVE角色会要复杂一点,因为这种节点选举是在后端不断进行的,他们需要随时做好升级成Leader的准备。
Dledger集群的选举是通过Raft协议进行的,Raft协议是一种多数同意机制。也就是每次选举需要有集群中超过半数的节点确认,才能形成整个集群的共同决定。同时,这也意味着在Dledger集群中,只要有超过半数的节点能够正常工作,那么整个集群就能正常工作。因此,在部署Dledger集群时,通常都是部署奇数台服务,这样可以让集群的容错性达到最大。
? 接下来,我们就用之前准备的3台服务器,搭建一个3个节点的Dledger集群。在这个集群中,只需要有2台Broker服务正常运行,这个集群就能正常工作。

第一步:部署nameserver
直接在三台服务器上启动nameserver服务即可
第二步:对Broker服务进行集群配置
在conf/dledger目录下,RocketMQ默认给出了三个配置文件,这三个配置文件可以在单机情况下直接部署成一个具有三个broker服务的Dledger集群,我们只需要按照这个配置进行修改即可。
mq1节点conf/dledger目录下broker-n0配置文件修改如下:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
storePathRootDir=/rocketmq/store/storeDledger/
storePathCommitLog=/rocketmq/store/storeDledger/commitlog
storePathConsumeQueue=/rocketmq/store/storeDledger/consumequeue
storePathIndex=/rocketmq/store/storeDledger/index
storeCheckpoint=/rocketmq/store/storeDledger/checkpoint
abortFile=/rocketmq/store/storeDledger/abort
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-mq1:40911;n1-mq2:40911;n2-mq3:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16

mq2节点conf/dledger目录下broker-n1配置文件修改如下:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
storePathRootDir=/rocketmq/store/storeDledger/
storePathCommitLog=/rocketmq/store/storeDledger/commitlog
storePathConsumeQueue=/rocketmq/store/storeDledger/consumequeue
storePathIndex=/rocketmq/store/storeDledger/index
storeCheckpoint=/rocketmq/store/storeDledger/checkpoint
abortFile=/rocketmq/store/storeDledger/abort
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-mq1:40911;n1-mq2:40911;n2-mq3:40911
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16

mq3节点conf/dledger目录下broker-n2配置文件修改如下:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=mq1:9876;mq2:9876;mq3:9876
storePathRootDir=/rocketmq/store/storeDledger/
storePathCommitLog=/rocketmq/store/storeDledger/commitlog
storePathConsumeQueue=/rocketmq/store/storeDledger/consumequeue
storePathIndex=/rocketmq/store/storeDledger/index
storeCheckpoint=/rocketmq/store/storeDledger/checkpoint
abortFile=/rocketmq/store/storeDledger/abort
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-mq1:40911;n1-mq2:40911;n2-mq3:40911
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16

几个需要重点关注的配置项:

enableDLegerCommitLog: 是否启动Dledger。true表示启动
namesrvAddr: 指定nameserver地址
dLedgerGroup: Dledger Raft Group的名字,建议跟brokerName保持一致。
dLedgerPeers: Dledger Group内各个服务节点的地址及端口信息。同一个Group内的各个节点配置必须要保持一致。
dLedgerSelfId: Dledger节点ID,必须属于dLedgerPeers中的一个。同一个Group内的各个节点必须不能重复。
sendMessageThreadPoolNums:dLedger内部发送消息的线程数,建议配置成cpu核心数。
store开头的一系列配置: 这些是配置dLedger集群的消息存盘目录。如果你是从主从架构升级成为dLedger架构,那么这个地址可以指向之前搭建住主从架构的地址。dLedger集群会兼容主从架构集群的消息格式,只不过主从架构的消息无法享受dLedger集群的两阶段同步功能。

在三台节点执行以下命令启动broker节点:

mq1:
nohup mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.1-bin-release/conf/dledger/broker-n0.conf &
mq2:
nohup mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.1-bin-release/conf/dledger/broker-n1.conf &
mq3:
nohup mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.1-bin-release/conf/dledger/broker-n2.conf &

自动形成了主从节点:主节点是140

在这里插入图片描述
关闭mq1-140的主节点,选举137为主节点。
在这里插入图片描述
启动Dlegder模式需要新建以下文件目录:

/rocketmq/store/storeDledger/
/rocketmq/store/storeDledger/commitlog
/rocketmq/store/storeDledger/consumequeue
/rocketmq/store/storeDledger/index
/rocketmq/store/storeDledger/checkpoint
/rocketmq/store/storeDledger/abort
文章来源:https://blog.csdn.net/qq_37200262/article/details/134994147
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。