有3大重要作用:
解耦: 例如用户完成下单除了必要的库存扣减和订单状态更新外,我们还需要处理一些积分系统、推送系统的无关紧要的业务处理,如果全部顺序执行,等待时间就会变得很漫长,所以我们需要借助MQ
将边角业务从业务模块中解耦开来。
异步: 这点不必多说,上述的解耦方案就会使得积分系统、促销系统、推送系统任务异步执行。
削峰: 可以理解为一个漏斗,例如我们的某个服务只能抗住10wQPS
,可是当前请求却达到20w
的QPS
,那么我们就可以将请求全部先扔到MQ
中,让服务慢慢消化处理。
相比于市场上的各种消息队列,它有如下优势:
当然缺点也是有那么一些些的:
兼容性确实不太行。
优点:
10w
级。Java
写的,对于Java
程序员来说非常方便改造。缺点:
Java
、C++
客户端,而且C++
还不算完善。MQ
核心实现JMS
相关接口,有些迁移改造就比较麻烦了。首先我们先到RocketMQ
官网下载软件(PS:笔者这里下载的是rocketmq-all-4.8.0-bin-release
这个版本)
https://rocketmq.apache.org/download
完成后为了能够正常启动并运行,我们必须配置一下环境变量,如下图配置一个名为ROCKETMQ_HOME
的环境变量,路径为MQ
安装路径:
准备就绪,我们就可以启动项目了,我们先到bin
目录启动mqnamesrv.cmd
start mqnamesrv.cmd
看到success
后就说明启动成功了
完成后,我们再运行mqbroker.cmd
,同理弹窗输出成功说明启动完成
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
接下来就是编码实践了,本次的示例是关于订单系统改造,都知道用户下单完成后,服务器需要进行库存扣减、订单状态更新、以及优惠券、积分等边边角角的业务,如果顺序执行这些逻辑+网络开销,等待时长对用户是非常不友好的。
所以我们在将边角逻辑抽出来,下单完成我们进行库存扣减、订单状态更新就行了,剩下的业务用MQ发个消息给积分系统、促销系统告知他们自己处理一下就行了。
说完了,首先我们创建一个Spring Boot
项目,配置yml
文件,内容如下:
server:
port: 8088
#rocketmq配置
rocketmq:
name-server: 127.0.0.1:9876
# 生产者配置
producer:
isOnOff: on
# 发送同一类消息的设置为同一个group,保证唯一
group: hyh-rocketmq-group
groupName: hyh-rocketmq-group
# 服务地址
namesrvAddr: 127.0.0.1:9876
# 消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
# 发送消息超时时间,默认3000
sendMsgTimeout: 3000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
完成后,我们就写一个MQ
的工具类
@Component
public class RocketMqHelper {
/**
* 日志
*/
private static final Logger logger = LoggerFactory.getLogger(RocketMqHelper.class);
/**
* rocketmq模板注入
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostConstruct
public void init() {
logger.info("RocketMq助手初始化,完成我们自定义的特殊处理");
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
*/
public void asyncSend(Enum topic, Message<?> message) {
asyncSend(topic.name(), message, getDefaultSendCallBack());
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
*/
public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {
asyncSend(topic.name(), message, sendCallback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
*/
public void asyncSend(String topic, Message<?> message) {
logger.info("MQ工具类发送异步消息,主题={},消息内容={}", topic, JSON.toJSONString(message));
rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack());
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, message, sendCallback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* 发送顺序消息
*
* @param message
* @param topic
* @param hashKey
*/
public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {
syncSendOrderly(topic.name(), message, hashKey);
}
/**
* 发送顺序消息
*
* @param message
* @param topic
* @param hashKey
*/
public void syncSendOrderly(String topic, Message<?> message, String hashKey) {
logger.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 发送顺序消息
*
* @param message
* @param topic
* @param hashKey
* @param timeout
*/
public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {
logger.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey + ",timeout:" + timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
/**
* 默认CallBack函数
*
* @return
*/
private SendCallback getDefaultSendCallBack() {
return new SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
logger.info("MQ消息发送成功。result={}",JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
logger.error("MQ消息发送失败,失败原因={}" + throwable.getMessage());
}
};
}
@PreDestroy
public void destroy() {
logger.info("---RocketMq助手注销---");
}
}
这时候,我们就可以编写一个请求,将根据传入的订单信息进行库存、订单状态更新,然后MQ
发送一个异步消息
@RestController
public class TestController {
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private RocketMqHelper rocketMqHelper;
@GetMapping("/orderPostProcessor")
public String orderPostProcessor() {
//这里为了省事模拟了一个订单对象,模拟前端传来的订单信息
Order order=new Order();
order.setOrederNo("20221217001002003");
order.setUserId("xxxjheqk78943165431548464");
order.setMoney(500);
logger.info("用户下单完成,进行库存系统、订单系统业务逻辑,用户id={},订单流水号={},订单价格={}",order.getUserId(),order.getOrederNo(),order.getMoney());
logger.info("用户下单完成,进行库存系统、订单系统业务逻辑执行结束");
logger.info("核心逻辑完成,发送异步消息告知积分系统、促销系统、推送系统业务处理....");
rocketMqHelper.asyncSend("ORDER_ADD", MessageBuilder.withPayload(order).build());
logger.info("订单异步消息发送成功");
return "success";
}
}
然后我们编写相关监听器监听ORDER_ADD
这个topic
,一旦收到这个topic
就进行各种边角业务逻辑处理
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener.class);
@Override
public void onMessage(Order order) {
logger.info("收到订单,订单信息:[{}],进行积分系统、促销系统、推送系统业务处理.....", JSON.toJSONString(order));
}
}
笔者为了省事直接用控制台调用这个请求
# cmd输入下面的指令
curl http://localhost:8088/orderPostProcessor
# 响应结果
success
控制台输出,可以看到边角非核心逻辑被我们解耦,我们处理完核心逻辑直接返回结果给前端时,他们才开始进行积分系统、促销系统、推送系统业务处理。
2022-12-17 14:33:14.729 INFO 15956 --- [nio-8088-exec-6] com.example.RocketMQdemo.TestController : 用户下单完成,进行库存系统、订单系统业务逻辑,用户id=xxxjheqk78943165431548464,订单流水号=20221217001002003,订单价格=500
2022-12-17 14:33:14.729 INFO 15956 --- [nio-8088-exec-6] com.example.RocketMQdemo.TestController : 用户下单完成,进行库存系统、订单系统业务逻辑执行结束
2022-12-17 14:33:14.729 INFO 15956 --- [nio-8088-exec-6] com.example.RocketMQdemo.TestController : 核心逻辑完成,发送异步消息告知积分系统、促销系统、推送系统业务处理....
2022-12-17 14:33:14.729 INFO 15956 --- [nio-8088-exec-6] com.example.RocketMQdemo.RocketMqHelper : MQ工具类发送异步消息,主题=ORDER_ADD,消息内容={"headers":{"id":"4c687807-20a8-353c-4645-75cfcc2eb152","timestamp":1671258794729},"payload":{"money":500,"orederNo":"20221217001002003","userId":"xxxjheqk78943165431548464"}}
2022-12-17 14:33:14.730 INFO 15956 --- [nio-8088-exec-6] com.example.RocketMQdemo.TestController : 订单异步消息发送成功
2022-12-17 14:33:14.731 INFO 15956 --- [blicExecutor_14] com.example.RocketMQdemo.RocketMqHelper : MQ消息发送成功。result={"messageQueue":{"brokerName":"LAPTOP-J398LO2Q","queueId":3,"topic":"ORDER_ADD"},"msgId":"7F0000013E5418B4AAC255853AEA0007","offsetMsgId":"C0A82BCB00002A9F0000000000003B0D","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
2022-12-17 14:33:14.732 INFO 15956 --- [MessageThread_2] c.example.RocketMQdemo.OrderMqListener : 收到订单,订单信息:[{"money":500,"orederNo":"20221217001002003","userId":"xxxjheqk78943165431548464"}],进行积分系统、促销系统、推送系统业务处理.....
使用回调
答: 主要有四种方式吧
/**
* 基于反射实现回调1
*/
public class Demo1 {
public static void main(String[] args) {
Request request=new Request();
new Thread(()->{
try {
request.send(CallBackClass.class,CallBackClass.class.getMethod("process"));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
class Request {
private static Logger logger = LoggerFactory.getLogger(Request.class);
public void send(Class clazz, Method method) throws Exception {
// 模拟等待响应
Thread.sleep(3000);
logger.info("收到用户的请求,处理业务逻辑后发起回调");
method.invoke(clazz.newInstance());
logger.info("回调结束");
}
}
class CallBackClass {
private static Logger logger = LoggerFactory.getLogger(CallBackClass.class);
public void process() {
logger.info("这个是回调方法哦");
}
}
/**
* 直接调用,耦合度比较高
*/
public class Demo2 {
public static void main(String[] args) {
Request1 request=new Request1();
new Thread(()->{
try {
request.send(new CallBackClass1());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
class Request1 {
private static Logger logger = LoggerFactory.getLogger(Request.class);
public void send(CallBackClass1 callBackClass) throws Exception {
// 模拟等待响应
Thread.sleep(3000);
logger.info("收到用户的请求,处理业务逻辑后发起回调");
callBackClass.process();
logger.info("回调结束");
}
}
class CallBackClass1 {
private static Logger logger = LoggerFactory.getLogger(CallBackClass.class);
public void process() {
logger.info("这个是回调方法哦");
}
}
/**
* 接口调用
*/
public class Demo3 {
public static void main(String[] args) {
CallBackInterface callBackInterface=new CallBackImpl();
Request2 request2=new Request2();
new Thread(()->{
try {
request2.send(callBackInterface);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
class Request2 {
private static Logger logger = LoggerFactory.getLogger(Request.class);
public void send(CallBackInterface CallBackInterface) throws Exception {
// 模拟等待响应
Thread.sleep(3000);
logger.info("收到用户的请求,处理业务逻辑后发起回调");
CallBackInterface.process();
logger.info("回调结束");
}
}
interface CallBackInterface{
void process();
}
class CallBackImpl implements CallBackInterface{
private static Logger logger = LoggerFactory.getLogger(CallBackImpl.class);
@Override
public void process() {
logger.info("处理回调");
}
}
/**
* lambda处理回调
*/
public class Demo4 {
private static Logger logger = LoggerFactory.getLogger(Demo4.class);
public static void main(String[] args) {
CallBackInterface callBackInterface=new CallBackImpl();
Request2 request2=new Request2();
new Thread(()->{
try {
request2.send(()->{logger.info("lambda处理回调");});
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
解耦、失败重试机制确保系统可用性
有两种,一种是队列模型
,如下图,生产者负责把消息扔到消息队列中,消费者去消息队列中抢消息,消息只有一个,先到者先得。
还有一种就是发布/订阅模型
了,和队列模型区别就是发布订阅模型的消息只要消费者有订阅就能消费一次。
发布订阅模式,可以有多个订阅者消费同一个消息,如下所示,笔者基于上述代码新建一个监听者,用的是不同的consumerGroup
,调试时可以发现,一个消息被不同的组都消费过一次。
@Component
@RocketMQMessageListener(consumerGroup = "gourp2", topic = "ORDER_ADD")
public class OrderMqListener2 implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener2.class);
@Override
public void onMessage(Order order) {
logger.info("订阅者2收到消息,订单信息:[{}],进行新春福利活动.....", JSON.toJSONString(order));
}
}
键入命令 curl http://localhost:8088/orderPostProcessor
输出结果
2022-12-17 15:42:58.638 INFO 17080 --- [blicExecutor_12] com.example.RocketMQdemo.RocketMqHelper : MQ消息发送成功。result={"messageQueue":{"brokerName":"LAPTOP-J398LO2Q","queueId":3,"topic":"ORDER_ADD"},"msgId":"7F00000142B818B4AAC255C512480011","offsetMsgId":"C0A82BCB00002A9F0000000000005963","queueOffset":1,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}
2022-12-17 15:42:58.638 INFO 17080 --- [MessageThread_8] c.example.RocketMQdemo.OrderMqListener2 : 订阅者2收到消息,订单信息:[{"money":500,"orederNo":"20221217001002003","userId":"xxxjheqk78943165431548464"}],进行新春福利活动.....
2022-12-17 15:42:58.638 INFO 17080 --- [MessageThread_1] c.example.RocketMQdemo.OrderMqListener : 收到订单,订单信息:[{"money":500,"orederNo":"20221217001002003","userId":"xxxjheqk78943165431548464"}],进行积分系统、促销系统、推送系统业务处理.....
知道,大概有下面这几个吧:
我们生产者发送的消息对象,以笔者为例,使用的就是Spring
框架的GenericMessage
。
public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = 4268801052358035098L;
private final T payload;
private final MessageHeaders headers;
.....
}
每个message
都必须有一个topic
,消费者就是通过这个Topic
订阅自己需要的消息。
有时候我们希望Topic
下的message
可以进行进一步分类,我们就可以对message
标识一个tag
进行区分。
从源码中我们也能看出来(destination formats: topicName:tags
),消息需要添加Tag
的,只需在topic:tag
这种格式去声明destination
即可。
// @param destination formats: `topicName:tags`
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());
}
所以我们如果我们希望前文的两个监听器可以基于tag进行监听的话可以这样改(注:没有标tag则当前topic都能监听)
@Component
//监听ORDER_ADD下tag为LISTENER1的消息
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD:LISTENER1")
public class OrderMqListener implements RocketMQListener<Order> {
........
}
由于RocketMQ
是发布订阅模式,所以一个消息可以被多个消费者消费,注意这里的多个消费者指的就是不同的消费者组。而每个消费者组中可以有多个消费者,总结一下,一个消息可以发给多个消费者组中的一个消费者消费一次。
我们发送的消息就会存放在queue
中,一个topic
下可以有多个queue
被消费过的消息不会从队列中移除,而是通过offset
进行标识,在offset
指针前面的就是消费过的消息,后面就是还没被消费的消息。
可以看到一个topc下有多个queue,每个queue都存放着message,每个message可能还会有tag,按照顺序发送给消费者consumer,发过的消息都在offset后面。
有两种消费模式:
这种是RocketMQ
默认模式,一个主题下的多个队列都会被消费者组中的某个消费者消费掉。
广播消费模式会让每个消费者组中的每个消费者都能使用这个消息。
从功能上我们就可以知晓,RocketMQ
大抵可以分为下面这个4个部分:
NameServer
:这是一个无状态的服务器,Producer
发送对应Topic
消息时会通过它查找路由获取Broker
信息并发送消息,Broker
会定期向它发送心跳连接,并定时同步告知其下的Topic
信息,Consumer
也是通过它轮询获取topic
的路由信息。Producer
:生产者,负责发送业务端的消息,通过负载均衡模式向broker
发送消息,支持同步、异步、单向3种消息发送模式。Broker
:负责存储和转发消息,维护一个consumer queue
,实际上就是存在一个CommitLog
文件中,注意Broker
和NameServer
的心跳连接底层是基于心跳机制。Consumer
:负责消费消息,消费方式有pull
和push
两种,前者是主动去批量拉取消息,只要拉取到消息就会启动消费进程。而后者则是将拉取到消息的回调给用户实现,同样的consumer
拉取到消息时监听就会触发用户的回调进行消费。如下图所示,上述的每一个组件都是集群方式部署的
这个问题我们要从3个角度考虑:
生产者发送消息要想确保可靠必须遵循以下3点:
SendResult result = producer.send(message);
if (!"SEND_OK".equals(result.getSendStatus().name())){
logger.warn("消息发送失败,执行重试的逻辑");
}
API
中查看是否存到Broker
中。存储阶段要保证可靠性就需要从以下几个角度保证:
Master
挂了还有Slave
可以用。CommitLog
中再返回成功。这里补充一下同步刷盘和异步刷盘的区别:
同步刷盘,可以看到必须真正写到磁盘才会返回成功
异步刷盘,如下图所示,可以仅仅是存到page cache
就返回成功,所以比较之下我们建议使用同步刷盘
要想实现同步刷盘,我们只需修改broker.conf
的配置文件即可
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
消费者编码逻辑一定要确保消费成功了再返回消费成功:
consumer.registerMessageListener((List<MessageExt> msgs,
ConsumeConcurrentlyContext context) -> {
String msg = new String(msgs.stream().findFirst().get().getBody());
logger.info("消费收到消息,消息内容={}", msg);
//消费完全成功再返回成功状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
这个我们可以分不同的情况讨论,有些场景下,我们只需保证业务幂等即可,例如:我们需要给订单服务发送一个用户下单成功的消息,无论发送多少次订单服务只是将订单表状态设置成已完成。
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener.class);
@Override
public void onMessage(Order order) {
logger.info("消费者收到订单,订单信息:[{}],进行积分系统、促销系统、推送系统业务处理.....", JSON.toJSONString(order));
updateOrderFinish(order);
}
private void updateOrderFinish(Order order){
logger.info("执行dao层逻辑,将订单设置下单完成,无论多少次,执行到这个消费逻辑都是将订单设置为处理完成");
}
}
还有一种方式就是业务去重,例如我们现在要创建订单,每次订单创建完都会往一张记录消费信息表中插入数据。一旦我们收到重复的消息,只需带着唯一标识去数据库中查,如果有则直接返回成功即可。
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
public class OrderMqListener implements RocketMQListener<Order> {
private static Logger logger = LoggerFactory.getLogger(OrderMqListener.class);
@Override
public void onMessage(Order order) {
logger.info("消费者收到订单,订单信息:[{}],进行积分系统、促销系统、推送系统业务处理.....", JSON.toJSONString(order));
//消费者消费时判断订单是否存在,如果存在则直接返回
if (isExist(order)){
return;
}
updateOrderFinish(order);
}
private void updateOrderFinish(Order order){
logger.info("执行dao层逻辑,将订单设置下单完成,无论多少次,执行到这个消费逻辑都是将订单设置为处理完成");
}
private boolean isExist(Order order){
return false;
}
}
如果是rocketMQ,每个小时定时, 查询%DLQ%
开头的消息, 一般会对接OA和企业微信做通知。
分两种情况讨论,如果是消费者少,message queue
多的情况,我们增加消费者数量即可。
如果是message queue
少而消费者多的情况,那么增加多少个消费者都没什么用了,那么就需要考虑将queue
中的内容挪到一个临时目录,然后增加一些消费者进行消费。
还是从两个角度考虑,先来说说全局消息有序吧,要想保证所有消息都有序的话,那么我们就干脆只设置一个生产者,一个队列,一个消费者。如下代码所示,可以看到笔者的send
无脑使用第一个队列
@GetMapping("/producer")
public String producer() {
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", "tagA", ("Hello MQ" + i).getBytes());
try {
//同步发送
SendResult result = producer.send(message, (List<MessageQueue> mqs, Message msg, Object arg)-> mqs.get(0),"");
if (!"SEND_OK".equals(result.getSendStatus().name())){
logger.warn("消息发送失败");
}
logger.info("消息{} 发送结果={}", i, JSON.toJSONString(result));
} catch (Exception e) {
logger.info("MQ基础示例消息发送失败,消息{},失败原因={}", i, e.getMessage(), e);
}
}
return "success";
}
然后消费者设置为按序消费
@GetMapping("/consumer/{topic}")
public String consumer(@PathVariable("topic") String topic) throws Exception {
createConsumer(topic);
return "success";
}
private void createConsumer(String topic) throws MQClientException {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe(topic, "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播模式
//consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册回调函数,处理消息
consumer.registerMessageListener((List<MessageExt> msgs,
ConsumeOrderlyContext context) -> {
msgs.stream().forEach(msg-> logger.info("消费收到消息,消息内容={}", new String(msg.getBody())));
//消费完全成功再返回成功状态
return ConsumeOrderlyStatus.SUCCESS;
});
//启动消息者
consumer.start();
}
从输出结果来看确实保证了消息有序
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ0
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ1
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ2
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ3
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ4
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ5
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ6
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ7
2022-12-18 17:39:34.091 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ8
2022-12-18 17:39:34.092 INFO 12412 --- [MessageThread_1] c.e.R.u.producer.TestController : 消费收到消息,消息内容=Hello MQ9
部门有序和上面思路同理,已订单创建、付款、推送、完成四个消息为例,要想实现消息有序,执行按照订单号hash
计算到到队列的索引位置然后按序存放到队列中即可。
例如有个用户订单号为123
,结果我们的哈希算法得到一个值3,那么它的创建、付款、推送、完成4个消息都按需存放到队列3中。
/**
* 保证队列有序的发送
*
* @throws Exception
*/
@GetMapping("orderlySend")
public void orderlySend() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("unique_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
//设置标签
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表,这个列表相同订单好处理是有序的,遵循创建、付款、推送(不一定存在)、完成
List<OrderStep> orderList = OrderStep.buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
OrderStep orderStep = orderList.get(i);
// 加个时间前缀
String body = dateStr + " Order Detail " + orderStep;
Message msg = new Message("order_topic", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, (List<MessageQueue> mqs, Message message, Object orderId) -> {
//根据订单id选择发送queue
Long id = (Long) orderId;
long index = id % mqs.size();
return mqs.get((int) index);
}, orderList.get(i).getOrderId());//使用订单id作为参数决定队列值
logger.info("生产者发送消息完成 status={}, queueId={}, body={}", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body);
}
producer.shutdown();
}
有三种,一种是Tag
过滤,笔者上文演示过
consumer.subscribe("order_topic", "TagA || TagC || TagD");
还有一种用sql
表达式
consumer.subscribe("order_topic", MessageSelector.bySql("a >=0 and a <= 3"));
还有一种用Filter Server
,比较复杂,也比较常用。
这个一般用于需要晚些进行确认的场景,例如你发起某些委托的请求,需要10s
后查看回报结果,那么我们就可以使用延迟消息。
如下代码所示
@GetMapping("sendDelayMsg")
public String sendDelayMsg() {
Message delayMsg = new Message();
//10s 后发送
delayMsg.setDelayTimeLevel(3);
delayMsg.setBody("这是个延迟消息".getBytes());
delayMsg.setTopic("delay_topic");
try {
logger.info("延迟消息发送开始");
producer.send(delayMsg);
logger.info("延迟消息发送结束");
} catch (Exception e) {
logger.error("延迟消息发送失败,失败原因={}", e.getMessage(), e);
return "fail";
}
return "success";
}
消费者代码
GetMapping("consumerDelayMsg")
public String consumerDelayMsg() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
try {
consumer.subscribe("delay_topic", "*");
consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
logger.info("收到延迟消息={}", message.getBody());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
} catch (Exception e) {
logger.error("延迟消息订阅失败,失败原因={}", e.getMessage(), e);
return "fail";
}
return "success";
}
输出结果,可以看到确实是10s后消费者采纳看到消息并消费
/**
* 2022-12-18 14:39:35.166 INFO 6224 --- [nio-8088-exec-4] c.e.R.u.producer.TestController : 延迟消息发送开始
* 2022-12-18 14:39:35.168 INFO 6224 --- [nio-8088-exec-4] c.e.R.u.producer.TestController : 延迟消息发送结束
* 2022-12-18 14:39:45.170 INFO 6224 --- [MessageThread_2] c.e.R.u.producer.TestController : 收到延迟消息[msgId=7F000001185018B4AAC25AB164FE0003] 3ms later
*/
我们发送延时消息的时候,broker
会将其放到一个SCHEDULE_TOPIC_XXXX
的message
队列中,然后scheduleMessageService
不断轮询这个队列中每个消息的状态,到期了就投放到目标topic
中给消费者消费。
分为以下几步:
producer
发送half消息
给broker
。broker
收到消息,给producer
回复OK
。producer
收到OK
,继续执行自己的业务,提交本地事务,向broker
发起commit或者rollback
。broker
收到commit
则提交消息,消费者就可以消费,反之就丢弃。注意,如broker
在指定时间内没有收到消息则回主动去找producer
查看half消息当前的情况。下面我们就延时一个broker
长时间未收到二次确认消息的示例
首先我们自定义实现一个MQ
事务监听器,如下所示,可以看到笔者为了方便延时遇到TagC
就直接返回未知(LocalTransactionState.UNKNOW)
public class TransactionListenerImpl implements TransactionListener {
private static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
logger.info("执行本地事务开始,msg={},tags={}", new String(msg.getBody()), msg.getTags());
//使用不同标签模拟成功或者失败的案例
if (StringUtils.equals("TagA", msg.getTags())) {
logger.info("事务A提交成功");
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
logger.info("事务B回滚");
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
//事务C返回未知,这个回重新调用checkLocalTransaction再次查看事务状态,若成功则再次发送
logger.info("事务C返回未知");
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
logger.info("MQ检查消息Tag【" + msg.getTags() + "】的本地事务执行结果");
//模拟返回成功
return LocalTransactionState.COMMIT_MESSAGE;
}
}
然后编写一个生产者代码
@GetMapping("sendTransaction")
public String sendTransaction() throws Exception {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_topic");
producer.setNamesrvAddr("127.0.0.1:9876");
//生产者这是监听器
producer.setTransactionListener(transactionListener);
//启动消息生产者
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("transaction_topic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
logger.info("发送事务消息,内容={},tag={}", new String(msg.getBody()), msg.getTags());
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
logger.info("事务消息发送结果={}", sendResult.getSendStatus());
TimeUnit.SECONDS.sleep(1);
} catch (MQClientException | UnsupportedEncodingException e) {
logger.error("事务消息处理失败,失败原因={}", e.getMessage(), e);
}
}
//producer.shutdown();
return "success";
}
当我们使用上文的消费者进行消费时从日志可以看到本次事务C发送了half消息,消费者消费时看到长时间返回unknown
就会询问情况,然后我们模拟的代码返回commit
状态,最终被消费者消费了
c.e.R.u.utils.TransactionListenerImpl : 执行本地事务开始,msg=Hello RocketMQ 2,tags=TagC
* 2022-12-18 15:13:36.208 INFO 23520 --- [nio-8088-exec-1] c.e.R.u.utils.TransactionListenerImpl : 事务C返回未知
* 2022-12-18 15:14:34.211 INFO 23520 --- [pool-2-thread-1] c.e.R.u.utils.TransactionListenerImpl : MQ检查消息Tag【TagC】的本地事务执行结果
#二次确认得到commitLocalTransactionState.COMMIT_MESSAGE,消费者进行消费。
消费收到消息,消息内容=Hello RocketMQ 2
通俗来说一个消息消费失败并重试达到最大次数后,MQ
就会将其放到死信队列中。超过三天该消息就会被销毁。
需要补充的时死信队列是针对一个group id
为单位创建的队列,如果一个gourp
中都没有死信的话,那么MQ
就不会为这个组创建死信队列。
先说说NameServer
吧,它是无状态的,所以我们通过集群就可以保证它的高可用。
接下来受说broker
,它就是保证高可用的重点所在,我们建议broker
采用集群+主从复制
的结构,如下图,这种集群+主从的方式,生产者向其中某个master
写,一个master
挂了还有另一个master
。则消费者可以向master
或者slave
读。
注意rocketMQ
目前不支持slave
转master
,所以一旦集群所有master
挂了我们只能手动修改slave
为master
重启一下使其变成master
。
broker
会在启动时向NameServer
注册信息,每30s
发送一次心跳,告知其还活着。producer
通过NameServer
获取broker
信息,根据均衡算法找到一台broker
写数据。consumer
从NameServer
中获取到broker
信息主动拉取消息并消费。要想了解Broker
如何保存数据,我们必须了解RocketMQ
三大文件:
producer
发送的消息最终都会通过刷盘机制存到commitlog
文件夹下。commitlog
下一个文件名为00000000000000000000
一旦写满,就会再创建一个文件写,一般来说第二个文件名为00000000001073741824
,名称即是第一个文件的字节数。文件大小一般是1G
。
这个文件夹下记录的都是commitlog
中每个topic
下的队列信息物理偏移量、消息大小、hashCode
值,如下图,consumequeue
文件夹下会为每个topic
创建一个文件夹
打开任意一个文件夹就会看到这样一个文件。
而这个文件内部最多维护30w
个条目,注意文件中每个条目大约20
字节,8字节代表当前消息在commitLog
中的偏移量,4字节存放消息大小,8字节存放tag
和hashCode
的值。
维护消息的索引,基于HashMap
结构,这个文件使得我们可以通过key
或者时间区间查询消息,文件名基本用时间戳生成的,大小一般为400M(差不多维护2000w个索引)
。
MQ
会为每个broker
维护一个commitlog
,一旦文件存放到commitlog
,消息就不会丢失。当无法拉取消息时,broker
允许producer
在30s内发送一个消息,然后直接给消费者消费。
后两个索引文件的维护是基于一个线程ReputMessageService
去异步维护后两个索引文件。
有两种方式:
在RocketMQ
中,ConsumeQueue
存储数据较少,并且是顺序读取,在pageCache
预读的机制下读取速率是非常客观的(即使有大量的消息堆积)
。
操作系统会将一部分内存用作pageCache
,当数据写入磁盘会先经过pageCache
然后通过内核线程pdflush
写入物理磁盘。
查询时,会先去pageCache
查询是否有数据,若有则直接返回。若没有则去ConsumeQueue
文件中读取需要的数据以及这个数据附近的数据一起加载到pageCache
中,这样后续的读取就是走缓存,效率自然上去了,这种磁盘预读目标数据的附近数据就是我们常说的局部性原理。
这是MQ
基于NIO
的FileChannel
模型的一种直接将物理文件映射到用户态内存地址的一种技术,通过MappedByteBuffer
,它的工作机制是直接建立内存映射,文件数据并没有经过JVM
和操作系统直接复制的过程,相当于直接操作内存,所以效率就非常高。
如下图就是零拷贝的工作机制,可以看到通过零拷贝技术,用户态缓存区直接和内核缓存区直接建立映射关系,避免了用户态到内存态来回复制文件的开销。
关于更多零拷贝的技术可以参考笔者这篇文章(内存映射文件部分):
两种方式分别是同步刷盘和异步刷盘
producer
发送的消息经过broker
后必须写入到物理磁盘commitLog
后才会返回成功。producer
发送的消息到达broker
之后,直接返回成功,刷盘的逻辑交给一个异步线程实现。而上面说的刷盘都是通过MappedByteBuffer.force()
这个方法完成的。
需要补充异步刷盘调用MappedByteBuffer.force()
,是通过一个异步线程FlushCommitLogService
实现的。
MQ中负载均衡的地方有很多,我们不妨一个个说吧
为了更好的讲述,我们不妨基于代码来debug
了解一下
Message message = new Message("topic1", "tagA", ("Hello MQ" + i).getBytes());
SendResult result = producer.send(message);
当我们producer
发送消息时,代码会走到DefaultMQProducer
,核心代码如下,设置完topic
直接调用defaultMQProducerImpl
发送消息,我们继续步进
@Override
public SendResult send(
.....略
//设置topic
msg.setTopic(withNamespace(msg.getTopic()));
//发送消息
return this.defaultMQProducerImpl.send(msg);
}
经过defaultMQProducerImpl
层层调用我们终于走到了核心逻辑
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
if (topicPublishInfo != null && topicPublishInfo.ok()) {
..........
for (; times < timesTotal; times++) {
........
//调用轮询算法找到合适的MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
}
............
}
步入源码我们就可以看到轮询查找messageQueue
的核心逻辑,可以看到无论sendLatencyFaultEnable
这个延迟开关是否开启,算法都是从tpInfo.getSendWhichQueue()
或者到一个索引值(注意这是一个ThreadLocal变量,底层源码实现是如果没有值则随机生成一个)
,然后拿着这个索引值和队列长度进行取模运算,最终得到队列的索引值。
而开启延迟开关的逻辑无非就是多一步判断是否有效的isAvailable
方法而已。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//如果开启延迟故障,则走下面的逻辑
if (this.sendLatencyFaultEnable) {
try {
//逻辑大体是按照sendWhichQueue这个ThreadLocal变量获取索引,然后按顺序CAS自增和messageQueueList进行与取模运算,最后得到队列索引返回
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
........
}
//默认的负载均衡算法,和上述差不多,只不过少了一个过滤
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
我们从源码PullMessageService
了解一下全流程,因为它是一个线程,所以我们不妨看看它的run方法,可以看到逻辑很简单,就是从pullRequestQueue
这个阻塞队列中拉取一个消息,然后调用pullMessage
存消息,所以我们就必须知道pullRequestQueue
里的消息从哪来。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
//拉取消息
this.pullMessage(pullRequest);
} .....
}
........
}
我们在RebalanceService
找到的答案,它也是一个线程,我们不妨看看run方法的逻辑,可以看到一个doRebalance
@Override
public void run() {
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
//核心逻辑,执行负载均衡
this.mqClientFactory.doRebalance();
}
}
经过重重debug
,我们来到了RebalanceImpl
,这里就是负载均衡实现的核心所在了,它首先会查询所有的MessageQueue
和ConsumerIdList
,然后使用默认策略allocateMessageQueueStrategy
进行消息分配,完成后调用updateProcessQueueTableInRebalance
更新结果,并通知PullMessageService
有新消息了
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
.....
case CLUSTERING: {
//获取MessageQueue和ConsumerIdList
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
..........
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序MessageQueue和ConsumerIdList
Collections.sort(mqAll);
Collections.sort(cidAll);
//默认是AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//负载均衡分配
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
.......
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//通知PullMessageService有消息进行消费了
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
........
break;
}
default:
break;
}
}
了解负载均衡的整体流程接下来就来探讨一下负载均衡的核心算法了,我们debug上文所有的allocate方法,可以看到下面这段逻辑,其实算法也很简单,就是让队列数和消费者数进行取模,若mod大于0说明不能平均分配,最前面的几个得多配几个,后面的正常分配就好了。
如果读者不理解可以将mqAll.size()假设为6,cidAll.size()设置为4思考一下,最后可以发现最终结果是2、2、1、1
即索引小于mod的会多分配一些。
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
然后我们再看看updateProcessQueueTableInRebalance
的核心逻辑this.dispatchPullRequest(pullRequestList);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
........
this.dispatchPullRequest(pullRequestList);
return changed;
}
经过debug
后发现RebalancePushImpl
会将上述结果存到pullMessageService
的队列中,这么一来逻辑就回到我们最开始的pullRequest
的获取了。
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
//这里会拿到pullMessageService的executePullRequestImmediately方法往队列里存放上文获取到的并组装为pullRequestList的消息
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
就是consumer
去拉取消息时,broker
发现没有消息,不返回于是就把PullReuqest
hold挂起来,规定时间内如果有结果了在组装并返回。
我们可以看看源码PullMessageProcessor
的processRequest
方法
//如果没有拉取到消息
case ResponseCode.PULL_NOT_FOUND:
// 且挂起标识都为true,则组装并将请求挂起来
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//封装一个PullRequest
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//把PullRequest挂起来
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
suspendPullRequest
逻辑也很简单,就是将其pullRequestList
中
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
然后PullRequestHoldService
这个线程会不断轮询检查,调用notifyMessageArriving
检查并唤醒某些有消息的请求。
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
//如果是长轮询则等待5s
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
//检查hold住的请求
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
我们不断步进查看一下checkHoldRequest核心逻辑,无非遍历各个topic中的queue查看最新的偏移量是否大于挂起队列中的偏移量,若大于则说明有新消息入队,然后调用notifyMessageArriving唤醒上文被挂起的pullRequest
private void checkHoldRequest() {
//遍历pullRequestTable中的key
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
//查看当前topic的队列最大偏移量
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
//将偏移量等参数传入,如果发现当前队列偏移量大于上文pullRequestTable的偏移量,说明有新消息进来,可以唤醒pullRequest
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
总结一下时序图
这篇文章笔者大概花费两天整理的,关于更多实践可以参考下面的文章: