RocketMQ集群方式有好几种
官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy
dledger搭建参考文档 https://rocketmq.apache.org/zh/docs/4.x/bestPractice/02dledger
MQ安装部署请看这篇:https://blog.csdn.net/HBliucheng/article/details/135357998
搭建过程中踩过的坑也也会记录下来
## 启动
nohup sh bin/dledger/fast-try.sh start
## 关闭
nohup sh bin/dledger/fast-try.sh stop
先启动 fast-try.sh start
启动时发现权限不足
nohup: 无法运行命令"bin/mqbroker": 权限不够
查看启动脚本
cat bin/dledger/fast-try.sh
那我们就修改下nohup 后面加上sh
修改后如下
function startNameserver() {
export JAVA_OPT_EXT=" -Xms512m -Xmx512m "
nohup sh bin/mqnamesrv &
}
function startBroker() {
export JAVA_OPT_EXT=" -Xms1g -Xmx1g "
conf_name=$1
nohup sh bin/mqbroker -c $conf_name &
}
再次启动发现可以了
执行命令 查看集群情况 BID =0的是主节点
sh bin/mqadmin clusterList -n 127.0.0.1:9876
再看看dashboarb
启动之前请先开放6个端口 如果还有端口访问不了的请自行开放出来
firewall-cmd --zone=public --add-port=30909/tcp --permanent
firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=30919/tcp --permanent
firewall-cmd --zone=public --add-port=30921/tcp --permanent
firewall-cmd --zone=public --add-port=30929/tcp --permanent
firewall-cmd --zone=public --add-port=30931/tcp --permanent
### 如果不想一次次开放下面命令也可以
firewall-cmd --zone=public --add-port=30900-30930/tcp --permanent
## 重启防火墙
systemctl reload firewalld
## 查看开放的端口
firewall-cmd --list-ports
## 其它命令
### 关闭端口
firewall-cmd --zone=public --remove-port=30909/tcp --permanent
启动生产者和消费者再看 master消费一个
停止master
lsof -i:30911
## 找到pid杀死 我的是118276
kill 118276
我们再启动 被杀死的broker
nohup sh bin/mqbroker -c conf/dledger/broker-n0.conf &
发现30911作为slave回来了
先准备三台机器
192.168.141.101
192.168.141.102
192.168.141.103
192.168.141.101修改如下
profile不修改也可以
vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.101:9876
source /etc/profile
修改 broker.conf 后面我们启动哪个就修改哪个 我是把 broker-n0.conf复制一份到broker.conf,也可以直接修改broker-n0.conf,启动时启动自己配置的conf文件就可以
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
192.168.141.102修改如下
profile不修改也可以
vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=92.168.141.102:9876
source /etc/profile
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
192.168.141.103修改如下
profile不修改也可以
vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.103:9876
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
开放端口
每台机器都要开放
firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=40911/tcp --permanent
systemctl reload firewalld
如果还有端口没开放,请自行开放
启动
每台机器都要启动
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -c conf/dledger/broker.conf &
查看日志
发现未创建文件夹,创建文件夹
mkdir -p /tmp/rmqstore/node00/commitlog
## 关掉再启动
sh bin/mqshutdown broker
## 启动broker
nohup sh bin/mqbroker -c conf/dledger/broker.conf &
查看集群情况
sh bin/mqadmin clusterList -n 127.0.0.1:9876
踩坑 这个值不要随便写,这里从0开始递增 ,不然选举会有问题
修改配置
## 根据自己的服务器地址修改,注意中间是分号不是逗号
rocketmq.config.namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
启动访问
关闭master再查看集群情况,然后再重启,和前面的单机集群一样的,大家可自行测试
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
rocketmq:
# 集群中间以分号隔开
name-server: 192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
producer:
group: my_group_test
package com.study.config.rocketmq;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* @author:
* @time: 2024/1/5 10:00
*/
@Component
@RocketMQMessageListener(consumerGroup = "my_group_test",
topic = "topic_test",
selectorType = SelectorType.TAG,
selectorExpression = "tagA")
@Slf4j
public class MQMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
String msg = new String(message.getBody(), CharsetUtil.UTF_8);
log.info("msgId={} msg={}",msgId,msg);
}
}
@RocketMQMessageListener 注解参数如下:
package com.study.controller;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* @author:
* @time: 2024/1/5 10:17
*/
@RestController
@RequestMapping("/mq")
@Slf4j
public class RocketMQProducerController {
@Resource
RocketMQTemplate rocketMQTemplate;
@PostMapping("/sendMessage")
@ResponseBody
public void sendMessage(String msg){
rocketMQTemplate.asyncSend("topic_test", "hello mq", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("msgId={}",sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
}
同步会有一点小问题,第一次启动不会消费,直接写成异步
发现没有主题
追踪源码发现主题和过滤消息的表达式按照冒号分割
topic取第一位,过滤表达式取第二位
修改再试下
rocketMQTemplate.asyncSend("topic_test:tagA", "hello mq", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("msgId={}",sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
发现可以了
前面写了个java客户端的消费者,改下消费组发现也可以消费
java客户端代码
package com.bsoft;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
* @author: liucheng
* @time: 2023/12/29 15:39
*/
public class MQConsumer {
private final static String nameServer = "192.168.141.101:9876";
private final static String consumerGroup = "my_group_test02";
private final static String topic = "topic_test";
public static void main(String[] args) throws MQClientException, IOException, InterruptedException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 设置NameServer的地址
consumer.setNamesrvAddr(nameServer);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(topic, "tagA");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
msgs.forEach((msg)->{
byte[] body = msg.getBody();
String s = new String(body, Charset.defaultCharset());
System.out.println("msg=================> " +s);
});
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started......");
// Thread.sleep(5000);
// consumer.shutdown();
System.in.read();
}
}
到此集群搭建完成,大家搭建过程中有遇到问题可以交流
整个搭建过程不难就是有点繁琐,需要配置多台服务器
其中配置brocker.conf时dLegerSelfId值这块要注意 ,dLegerSelfId是节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一。这个值从0开始递增
同一台服务器上启动时先启动 namesrv 再启动 broker