RabbitMq是目前主流的消息服务,其他常用的有RocketMQ、kafka等。还有比较古老的ActiveMQ,现在使用的比较少了,基本都是前面三种,一般会根据自己业务需要选择不同的消息中间件。这一篇总结RabbitMQ。
1.MQ的主要作用
削峰:流量突然爆发针对超出容量的消息,进行入队等待消费。
异步:异步处理有助于给请求端更快的响应,更好的体验。
解耦:系统间调用使用OpenFeign或者RPC都是同步的,很多场景是可以使用异步的,此时使用MQ就可以实现系统间的解耦
2.MQ遵循的协议
JMS:java message service,是java定义了api接口,实现者根据接口进行实现,典型的是ActiveMQ
AMQP:AMQP是一种消息协议,他定义的是一种规范,因此可以实现跨平台和语言,典型的是RabbitMQ
其他协议:kafka则是自定义的协议,并不是以上两种,而且kafka目前占有率已经越来越高了
3.MQ是典型的生产者消费者模型
在MQ中主要有三大角色:生产者、MQ、消费者,他是典型的生产者消费者模型,生产者生产消息交给MQ,MQ内部通过一系列动作将其放入队列,消费者监听队列获取消息。
4.RabbitMQ内部都有什么
Broker:中间人的意思,其实就是指MQ服务本身
Host:Host相当于在Broker内部的虚拟机,一个Broker内部可以有多个Host,每个Host内的交换机和队列都是隔离的,通常用于区分系统
Exchange:交换机,生产者将消息发送到Broker时,是先将消息发送到交换机的,交换机根据routingkey进行发送到队列
Queue:消息队列,发送方发送消息的最终目的地,与消费方监听的对象,也是消息的存储之地
5.发送方如何通过RabbitMQ发送消息
1.生产者和Broker通过三次握手建立TCP连接,形成connection,这个在Management中会有展示
2.生产者和Broker基于connection的TCP长连接建立逻辑通道channel,这个在管理页面也有展示
connection因为是TCP连接,建立起来属于重量级的动作,而channel是在TCP长连接内部的逻辑连接,相对比较轻量级,所以一般connection是不会被主动销毁的,除非是一方主动断开连接,而channel若是长时间不使用是会被销毁的。
3.生产者发送消息到Broker,Broker根据指定的Host(不指定默认进入默认Host:/),然后交给他的交换机,交换机根据routingkey进行路由到指定的队列,到底消息发送成功
6.消费方如何通过RabbitMQ消费消息
7.RabbitMQ工作模型
8.RabbitMQ与主流MQ的对比
direct 直连交换机
该交换机通过routing key和队列建立关系,且只有routing key完全匹配时才可以将消息路由到相应的队列,否则路由失败,不支持通配符,且必须是完全匹配的routing key,所以叫直连交换机。一个直连交换机可以绑定多个队列,每个队列都可以声明不同的routing key,若是所有队列都是用相同的路由key或者使用空或者null进行绑定,那么直连就变成了广播交换机。
fanout 广播交换机
广播交换机,消息收到后会发给所有绑定到交换机的队列,不会区分routing key,即使队列绑定广播交换机时声明了routing key,也不会遵循routing key,这里的模式永远是全部分发给所有绑定的队列,忽略rouint key。
topic 主题模式
主体模式根据routing key进行消息分发,与direct交换机的区别是,topic支持通配符。因此topic成为了最为灵活的交换机,他既能做direct的事,也可以做fanout的事,所以topic也是使用最为广泛的交换机。他的通配符支持两种,一种是#(匹配0个或者多个任意词),一种是*(匹配任意一个词)。比如有以下四个队列绑定了topic交换机,队列之后是各自的routing key
如果发送一条消息他的rouking key是key.one 则消息只会进入到queue.one中
如果发送一条消息他的rouking key是key.one.msg 则消息只会进入到queue.two中
如果发送一条消息他的rouking key是key.three.msg 则消息只会进入到queue.two、queue.three、queue.thrff
如果发送一条消息他的rouking key是key.three.msg.action 则消息只会进入到queue.thrff
这就是主体交换机了,记住一点:*表示可以匹配任何一个词,这个词表示的是点之间间隔的词,而不是一个字母,比如key.one.msg这里的key或者one或者msg是一个词,*可以代表这种一个词。#,代表0或者多个词。
headers 头部交换机
头部交换机,就是使用头部信息进行路由的交换机,该交换机不支持使用routing key进行路由,只能通过头部信息进行路由,且头部交换机在绑定队列时应该声明监听的头部属性,同时声明x-match,x-math有两个值,一个是any一个是all。any表示匹配任何一个键值对就会进行路由,all表示必须满足全部的键值对才会路由,如果在进行交换机和队列的绑定时没有声明x-match,则默认是all。
如果一个头部交换机绑定了这三个队列,则headers.one 必须头部信息包含 key=headers.one 和key2=headers.two两个键值对,才会成功路由到headers.one队列。而headers.three则只需要包含一种一个就可以路由,因为他的x-match是any。
还有一点需要注意发送消息时,属性声明的是头部信息,而不是属性,这个需要注意。
上面了解了一些基本概念,这里需要开始实操了,首先需要搭建一个MQ的服务端和管理控制台,如果手动安装下载的话这俩是分开的,这里使用docker进行安装,我们可以在docker 仓库中选择后缀带有management的镜像,这些都是集成了MQ服务和管理控制台的镜像,只需要一次安装即可,下面是安装命令:
这里对外暴露了两个端口5672,用于MQ服务对外的连接端口,还有一个15672,这是管理控制台的访问端口,不过MQ不是只用了这俩端口,只是其他端口可以不向外暴露而已。
# -d 后台运行
# -u 指定运行用户
# --name 指定容器名
# -p 端口映射
docker run -d -u root --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11.2-management
安装完成后,使用guest/guest进行登录管理控制台:
到此服务端已经搭建完成了。
上面的搭建命令会存在一个问题,若是mq因为问题需要重建,那么mq中的交换机、队列、消息、用户等等信息将全部会丢失,这种问题是不能容忍的,因为这种信息丢失是很大的生产问题,那么问题来了,怎么解决呢,先上命令
docker run -d
-u root \
--name rabbitmq \
--hostname rabbit-1 \
-v /apps/rabbitmq/data:/var/lib/rabbitmq/mnesia
-p 5672:5672 -p 15672:15672 \
rabbitmq:3.11.2-management
使用这个命令,无论容器意外损坏需要重新建立rabbitmq容器,只要还是使用了这个数据目录,数据就依然还在,这里需要注意hostname的指定,一定是不可以省略的,因为rabbitmq的持久化文件会以容器的hostname作为持久化文件名的一部分,如下所示:
如果不手动指定hostname,那么即使数据文件映射没问题,数据依然是不可用的,这点很容易被忽略,其次就是数据映射了,rabbitmq的数据存储目录是/var/lib/rabbitmq/mnesia ,所以需要我们将他映射到本地。通过这两部操作就可以将容器内的持久化数据永久留存到本地了,即使更换容器信息也不回丢失。
服务端已经搭建完成,那么就可以开始验证消息的发送了,这里环境使用SpringBoot2.6.11 进行测试。
这里进行简单的消息发送的展示
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
# host: 192.168.150.199
# port: 5672
addresses: 192.168.150.199:5672
virtual-host: /
username: guest
password: guest
package com.ebbing.task.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author pcc
*/
@Slf4j
@RestController
public class TestController {
@Autowired
RabbitTemplate rabbitTemplate;
@PostMapping("/testMQ")
public void testMQ(){
log.info("收到信息");
rabbitTemplate.convertAndSend("amq.topic","one","hello amqp");
log.info("发送完成");
}
}
这里是通过指定交换机、路由key、消息三者来去发送消息的,需要我们在管理页面,正确配置交换机和队列的绑定,其他就没了。
amq.topic:交换机
one:路由key
hello amqp:发送的message
消费者同样需要引入相同的依赖和配置,这个和生产者相同,这里不重复展示了。唯一需要展示的是简单的一点代码:
package com.cheng.ebbing.message.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author pcc
*/
@Slf4j
@Component
public class TestMQ {
@RabbitListener(queues = "Test-queue-one")
public void test(String msg){
log.info("收到消息:{}",msg);
}
}
代码很简单,只需要借助注解RabbitListener轻松实现队列的监听,然后获取到队列的消息,至于监听方法使用的参数类型,Spring也是可以自动帮我们转化的。
在SpringBoot中发送消息时,SpringBoot已经为我们封了一个Template为我们使用,他可以方便我们处理消息,大致列举了以下几类消息可能会用到,但是RabbitTemplate并不是只提供了这些,还有很多。下面每一个方法基本都是有很多重载方法,这里只说这类方法的作用,无法一一列举。
send:原始的消息发送api,与convertAndSend的区别是未使用转换器,但也可以做为消息发送的api
TbDict tbDict = new TbDict();
tbDict.setDictId(111L);
tbDict.setDictDesc("测试desc");
tbDict.setTypeCode("test_code");
Message message = MessageBuilder.withBody(JSON.toJSON(tbDict).toString()
.getBytes("UTF-8"))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.send("amq.topic","one",message,new CorrelationData());
下面是mq的管理端获取到的消息:
下面是使用java监听接收到的消息:
send系列api都差不多。
sendAndReceive:官方注释解释是基于RPC模式的调用,会尝试获取接收结果,等带一段时间(直到超时)没有结果就返回null,我尝试了多次,无论信息有没有被接收到获取到的返回信息都是null,感觉不推荐使用。
convertAndSend: 这个api是使用频率最高的了,基本使用RabbitTemplate都是会使用这个api进行消息发送,他相对于send来说该api多的功能是消息转换。该api支持直接传入消息内容,默认的消息转换器SimpleMessageConverter会将消息内容根据String、Byte[]、Serializable、null进行不同的转换。但是如果使用send方法,则不会有这种自动操作,需要我们手动设置Message,这里需要我们声明消息头已经内容编码,内容格式等信息。
rabbitTemplate.convertAndSend("amq.topic", "one", JSON.toJSON(tbDict).toString(), new CorrelationData());
下面是收到消息的截图:
convertSendAndReceive:这个方法不言而喻了,就是会等待接收结果和上面的sendReceive差不多
receive:可以用来做消息接收,不过一般都是使用@RabbitListener来进行接收消息,不过使用receive也是同样可以的,不过receive触发一次只会接收一条消息。
Message message = MessageBuilder.withBody(JSON.toJSON(tbDict).toString()
.getBytes("UTF-8"))
// 当使用默认的转换器时,即使这么设置在mq中的消息仍然是text/plain
// 使用Jackson2json转换器时,则无需这么设置也会是json格式
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
CorrelationData correlationData = new CorrelationData();
correlationData.setReturned(new ReturnedMessage(message,0,null,"amq.topic","one"));
rabbitTemplate.convertAndSend("amq.topic", "one", JSON.toJSON(tbDict).toString(), correlationData);
log.info("发送完成");
Message receive = rabbitTemplate.receive("Test-queue-one");
String content = new String(receive.getBody(), StandardCharsets.UTF_8);
log.info("接收到消息:{}",content);
log.info("接收到消息-转换json:{}",JSON.parse(content).toString());
需要说的是,若是使用默认的消息转换器,则我们发送String类型的消息时,消息内容类型会被设置为:text/plain,此时我们接收到消息时这种样式:
若是使用了消息转换器,比如Jackson2JsonMessageConverter,则会帮我们自动设置application/json格式,则java中接收到如下:
他们在管理台的展示如下:
不过无论哪种,在Java中使用JSon工具类都是可以正常转换的,如上面的JSON.parse。
addBeforePublishPostProcessors
该方法顾名思义就是在消息发送之前对消息进行改造的方法,如果有统一处理,我们是可以放在这里的,比如消息的id处理,格式处理,请求头处理等,都可以放入到这里,这样即可与业务进行解耦了,下面只是展示,如果使用可以直接将这块代码写入配置。
rabbitTemplate.addBeforePublishPostProcessors(message1->{
log.info("发送前处理开始-message1:{}",message1);
message1.getMessageProperties().setContentType("application/xml");
log.info("发送前处理完成-message1:{}",message1);
return message1;
});
代码比较简单就是将消息的样式改成了"application/xml"了,下面是日志输出:
下面是管理台的截图,可以看到消息内容变成了我们已经更改的"application/xml"了。
addAfterReceivePostProcessors
这个与上面类似,是消息接收之后的处理方法,不过需要注意的是这里的收到之后执行是指channel收到,而不是指exchange或者queue,更不是指消费者。这个很容易误解,同时发送到了channel并不表示进入了exchange,更不表示进入了队列
rabbitTemplate.setAfterReceivePostProcessors(message2->{
return message2;
});
注意postProcessors此类方法只允许全局设置一次,而且需要设置在配位文件中,不可设置到业务代码里,否则每次执行业务代码都会新增一个postprocessors,因为此类方法都是允许多个processor的。
上面的demo里只使用了这个注解指定队列,就实现了消息的消费。实际上绝大部分场景使用这一个注解就够了,少数特殊场景需要的注解可以看5中的使用,这部分提供了额外的一些支持。这里详细介绍下RabbitListener的所有属性,RabbitListener可用于类、方法、注解等上面,且一个方法上支持声明多次该注解。
@RabbitListener(queues = {"Test-queue-one","test_queue"})
public void test(String msg){
log.info("收到消息:{}",msg);
}
package com.cheng.ebbing.message.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author pcc
* @version 1.0.0
* @description 多mq配置类
*/
@Configuration
public class anothermqconfig {
@Bean("mySimpleRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory authorityListenerContainerFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("192.168.150.202");
connectionFactory.setPort(5673);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(false);
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMissingQueuesFatal(false);
return factory;
}
}
然后我们只需要在使用RabbitListener注解时声明containerFactory即可,如下:package com.cheng.ebbing.message.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author pcc
*/
@Slf4j
@Component
public class TestMQ {
@RabbitListener(queues = {"Test-queue-one","test_queue"})
public void test(String msg){
log.info("test1收到消息:{}",msg);
}
@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory")
public void test2(String msg, Channel channel) throws IOException {
log.info("test2收到消息:{}",msg);
channel.basicAck(1,false);
}
}
如上,test方法使用的是默认配置的Rabbit,test2使用的是自定义配置的另一个Rabbit,此时从两个mq分别往对应队列发消息,就可以实现一个服务监听多个Rabbit的队列:@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",concurrency = "5")
public void test2(String msg, Channel channel) throws IOException {
log.info("test2收到消息:{}",msg);
channel.basicAck(1,false);
}
下面截图可以看出,客户端确实启动了五个消费者:@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",exclusive = true)
public void test2(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("test2收到消息:{}",msg);
channel.basicAck(deliveryTag,false);
}
@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",exclusive = true)
public void test3(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("test3收到消息:{}",msg);
channel.basicAck(deliveryTag,false);
}
如上,test2和test3将只有一个可以接收到消息,且是一直是那一个接收,另一个不接收。@RabbitListener(queues = "test_queue_two",containerFactory = "mySimpleRabbitListenerContainerFactory",exclusive = true,ackMode = "NONE")
public void test3(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
log.info("test3收到消息:{}",msg);
channel.basicAck(deliveryTag,false);
}
这就是RabbitListener支持的大部分属性了,基本已经可以满足大部分需要了,但却不是全部,有些场景还是需要配合其他信息一起使用的。这里介绍下可以配合RabbitListener注解使用的一些注解。
@RabbitListener(queues = "Test-queue-one",ackMode = "MANUAL")
public void testRabbitHandler2(@Payload String s,@Header(AmqpHeaders.DELIVERY_TAG) long delivery ,Channel channel) throws IOException {
log.info("testRabbitHandler2: {},delivery:{} , channel:{} ",s,delivery,channel);
channel.basicAck(delivery,false);
}
不过这里的Payload完全可以不用,是不会有任何影响的,默认会将消息转换到这个参数上,不过这需要建立在只有一个这种参数的基础上,如果此时delivery不加注解,s也不加注解肯定会报错,因为springamqp不知道转换到哪个参数上,channel本身就是特殊的类型,不会影响消息转换。
@RabbitListener(queues = "Test-queue-one",ackMode = "MANUAL")
@SendTo("test_queue")
public String testRabbitHandler2(String s,@Header(AmqpHeaders.DELIVERY_TAG) long delivery ,Channel channel) throws IOException {
log.info("testRabbitHandler2: {},delivery:{} , channel:{} ",s,delivery,channel);
channel.basicAck(delivery,false);
return "hahahh";
}
如上代码,会在执行完之后发送test_queue队列中消息,发送的内容就是方法的返回内容,这里sendTo的使用和RabbitListener没有什么关系。
消息可靠性是mq必须要关注的一点,因为是异步通信所有中间任何环境都有可能出现问题,所以消息可靠性就更为重要了,一般需要从以下几个方面考虑:
需要增加两个配置项,打开消息发送确认,如下publisher-confirm-type: correlated表示消息到达交换机时,会有回调,publisher-returns: true 则表示消息达到最列时会有回调,队列回调一般只有routing key错误才会触发。
spring:
rabbitmq:
# host: 192.168.150.199
# port: 5672
addresses: 192.168.150.202:5672
virtual-host: /
username: guest
password: guest
publisher-confirm-type: correlated # 交换机回调 none/simple/correlated
publisher-returns: true # 队列回调
publisher-confirm-type 用于消息发送到交换机的回调,对消息可靠性要求比较高的话建议开启,且使用correlated模式,不过开启回调后mq的性能会有大幅下降,虽然大幅下降但是每秒接近上万的并发还是可以支持的(无回调配置允许情况下可到10w),这个配置他支持三种模式:
boolean flag = rabbitTemplate.waitForConfirms(1000);
publisher-returns 用于消息发送到队列的回调,一般只有routing key的错误才会导致消息到达交换机成功而到达队列失败,所以这个配置一般可以不开启,因为意义不大,触发发送消息时写错了routing key。因为mq内部可以保证routing key正确的情况下,消息到达交换机后一定可以到达队列。
上面的两种回调(交换机回调、队列回到),需要我们手动设置回调方法才可以实现真正的回调,而且这两个回调方法都是全局共用的,而且只能设置一次,所以回调方法一般会设置在配置文件中。
// 这里使用correlationData 记录数据,要求发送方必须传递correlationData,否则异常
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
});
切记这俩回调方法只允许设置一次, 交换机回调的代码中的correlationData 就是我们发送消息时传递给mq的CorrelationData,这个是原封不动传回来的,注意是原封不动传回来的,所以可以利用这一点做很多事,比如失败重试时就需要这里的信息进行重新发送。ack是一个boolean值,true表示消息到达交换机成功,false表示消息到达交换机失败了。cause在ack为false时,会给出造成ack失败的可能原因。
// 队列回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 获取消息内容
String msgContent = new String(message.getBody(), StandardCharsets.UTF_8);
// 这里只用来做日志输出,一般只有路由key错误才会触发
log.error("RabbitTemplate 发送消息到达队列失败,请检查routing key, " +
"exchange: {}, " +
"routingKey: {}, " +
"replyCode: {}, " +
"replyText: {}, " +
"message: {} ",
exchange,
routingKey ,
replyCode,
replyText,
msgContent);
});
这个回调一般只有routing key错误才会出现,所以无需我们来做什么,最多记录下信息即可。不过需要注意的是队列回调在成功时是不回调的也就是说消息成功到达队列不会触发该方法,只有消息到达队列失败了才会触发这个回调,所以这里使用的是log.error。message是消息对象,replyCode是错误编码,replyText是错误信息,exchange交换机,routinKey路由键。
当消息发送失败以后,发送方开启了交换机回调我们就可以获知到消息发送失败了,若是失败则需要我们进行尝试重新发送,也就是失败重试了,如果对消息有更高的要求还需要考虑失败重试最后依然失败的处理机制,如果需要可以将消息进行入库然后另起线程进行处理,这里只展示失败重试的操作。
失败重试这种机制是很常见的,一般的失败重试基本参数也是类似,这里在rabbit配置中增加以下自定义配置,这个重试是参照网关的重试过滤器的重试机制来设置的。
spring:
rabbitmq:
publisher: # 注意:这个配置是自定义配置,用于发送方交换机失败重试,并不是官方配置
retry:
enabled: true # 开启失败重试
max-retries: 10 # 最大重试次数
first-interval-millis: 2000 # 首次重试的间隔时间ms
max-interval-millis: 10000 # 最大重试间隔时间ms,间隔时间超过10s将以10s作为最大间隔时间
factor: 2 # 乘子:重试间隔的倍数
based-previous-value: true # 是否根据前一次的时间进行计算间隔时间,fase的话根据第一次间隔时间进行计算
这个配置类为了获取到rabbit的配置信息,方便在项目中使用
package com.cheng.common.mq.api.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* @author pcc
* @version 1.0.0
* @description rabbit配置类
*/
@Data
@Configuration
public class RabbitConfig {
// mq基础配置
@Value("${spring.rabbitmq.host:localhost}")
private String host;
@Value("${spring.rabbitmq.port:5672}")
private Integer port;
@Value("${spring.rabbitmq.addresses:localhost:5672}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
// 发送方ack 配置
@Value("${spring.rabbitmq.publisher-confirm-type}")
private String publisherConfirms;
@Value("${spring.rabbitmq.publisher-returns}")
private boolean publisherReturns;
// 发送方nack重试配置
@Value("${spring.rabbitmq.publisher.retry.enabled:false}")
private Boolean retryEnabled;
@Value("${spring.rabbitmq.publisher.retry.max-retries:3}")
private Integer retryMaxRetries;
@Value("${spring.rabbitmq.publisher.retry.first-interval-millis:200}")
private Integer retryFirstIntervalMillis;
@Value("${spring.rabbitmq.publisher.retry.max-interval-millis:1000}")
private Integer retryMaxIntervalMillis;
@Value("${spring.rabbitmq.publisher.retry.factor:2}")
private Integer retryFactor;
@Value("${spring.rabbitmq.publisher.retry.based-previous-value:false}")
private Boolean retryBasedPreviousValue;
// mq重连(失败重试)配置
@Value("${spring.rabbitmq.template.retry.enabled}")
private Boolean templateEnabled;
@Value("${spring.rabbitmq.template.retry.max-attempts}")
private Integer templateMaxAttempts;
@Value("${spring.rabbitmq.template.retry.initial-interval}")
private Integer templateInitialInterval;
@Value("${spring.rabbitmq.template.retry.max-interval}")
private Integer templateMaxInterval;
}
这里要去消息发送时必须携带CorrelationData,以下代码主要通过relayCode来实现回调,注意这里的relayCode在交换机回调期间无论是ack还是nack都是我们设置的值,所以我们可以利用这个值来标识回调的次数而不用借助其他工具。
package com.cheng.common.mq.api.config;
import com.cheng.common.mq.api.processor.DealReturnMessagePostProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
/**
* @author pcc
* @version 1.0.0
* @description Rabbit 回调配置
*/
@Slf4j
@Configuration
public class RabbitConfirmConfiguration {
@Resource
private RabbitConfig rabbitConfig;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
// 更换消息转换器,json类型的消息,接收到以后会有转义符
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息前置处理器
rabbitTemplate.setBeforePublishPostProcessors(new DealReturnMessagePostProcessor());
}
//回调
@PostConstruct
@ConditionalOnProperty(name = "spring.rabbitmq.publisher-confirm-type",havingValue = "correlated")
public void initConfirmCallBack(){
// 这里使用correlationData 记录数据,要求发送方必须传递correlationData,否则异常
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
if(ack){
log.info("RabbitTemplate 发送消息到达交换机成功 " +
"exchange: {} , " +
"routing key: {} ," +
"correlationId: {}, " +
"replyCode: {}, " +
"message: {} ",
correlationData.getReturned().getExchange(),
correlationData.getReturned().getRoutingKey(),
correlationData.getId(),
correlationData.getReturned().getReplyCode(),
new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));
}else{
log.error("RabbitTemplate 发送消息到达交换机失败, 原因:{}, " +
"exchange: {}, " +
"routing key: {}," +
"correlationId: {}," +
"replyCode: {}, " +
"message: {}",
cause,
correlationData.getReturned().getExchange(),
correlationData.getReturned().getRoutingKey(),
correlationData.getId(),
correlationData.getReturned().getReplyCode(),
new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));
// 重试
try {
retry(rabbitTemplate,rabbitConfig,correlationData);
} catch (InterruptedException e) {
log.info("RabbitTemplate 消息重试期间异常:{} ",e.getMessage());
}
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 获取消息内容
String msgContent = new String(message.getBody(), StandardCharsets.UTF_8);
// 这里只用来做日志输出,一般只有路由key错误才会触发
log.error("RabbitTemplate 发送消息到达队列失败,请检查routing key, " +
"exchange: {}, " +
"routingKey: {}, " +
"replyCode: {}, " +
"replyText: {}, " +
"message: {} ",
exchange,
routingKey ,
replyCode,
replyText,
msgContent);
});
}
// 交换机nack重试,要求发送前CorrelationData必须不为空
private void retry(RabbitTemplate rabbitTemplate, RabbitConfig rabbitConfig, CorrelationData correlationData) throws InterruptedException {
// 重构message
Message message = MessageBuilder.withBody(correlationData.getReturned().getMessage().getBody())
.setContentEncoding("UTF-8")
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setMessageId(correlationData.getReturned().getMessage().getMessageProperties().getMessageId())
.build();
// 重构correlationData
correlationData.setReturned(
new ReturnedMessage(correlationData.getReturned().getMessage(),
correlationData.getReturned().getReplyCode()+1,
correlationData.getReturned().getReplyText(),
correlationData.getReturned().getExchange(),
correlationData.getReturned().getRoutingKey())
);
// 计算暂停时间
if(rabbitConfig.getRetryEnabled()){
// 首次推送
if (correlationData.getReturned().getReplyCode() == 1) {
TimeUnit.MILLISECONDS.sleep(rabbitConfig.getRetryFirstIntervalMillis());
} else if(correlationData.getReturned().getReplyCode()<=rabbitConfig.getRetryMaxRetries()){
// 非首次推送
if (rabbitConfig.getRetryBasedPreviousValue()) {
// 基于上次推送时间计算下次推送时间,若是时间大于最大等待时间,则使用最大等待时间
TimeUnit.MILLISECONDS.sleep(
(long) ((rabbitConfig.getRetryFirstIntervalMillis() * Math.pow(rabbitConfig.getRetryFactor(),correlationData.getReturned().getReplyCode() - 1))<=rabbitConfig.getRetryMaxIntervalMillis()?
(rabbitConfig.getRetryFirstIntervalMillis() * Math.pow(rabbitConfig.getRetryFactor(),correlationData.getReturned().getReplyCode() - 1)):
rabbitConfig.getRetryMaxIntervalMillis())
);
} else {
// 基于首次推送时间计算下次推送时间,若是时间大于最大等待时间,则使用最大等待时间
TimeUnit.MILLISECONDS.sleep(
rabbitConfig.getRetryFactor() * rabbitConfig.getRetryFirstIntervalMillis()>=rabbitConfig.getRetryMaxIntervalMillis()?
rabbitConfig.getRetryMaxIntervalMillis():
rabbitConfig.getRetryFactor() * rabbitConfig.getRetryFirstIntervalMillis());
}
}
if(correlationData.getReturned().getReplyCode()<=rabbitConfig.getRetryMaxRetries()){
log.info("RabbitTemplate 已开启交换机发送失败重试,最大重推次数:{} ,开始第:{} 次重推, " +
"exchange: {}, " +
"routing key: {}, " +
"correlationId: {}, " +
"replyCode: {}, " +
"message: {} ",
rabbitConfig.getRetryMaxRetries(),
correlationData.getReturned().getReplyCode(),
correlationData.getReturned().getExchange(),
correlationData.getReturned().getRoutingKey(),
correlationData.getId(),
correlationData.getReturned().getReplyCode(),
new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));
rabbitTemplate.convertAndSend(
correlationData.getReturned().getExchange(),
correlationData.getReturned().getRoutingKey(),
new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8),
correlationData
);
}
}else{
// 默认为0,上方构建的correlationData对象中没有设置重试次数+1
if(1==correlationData.getReturned().getReplyCode()){
log.info("RabbitTemplate 未开启交换机发送失败重试,尝试重推一次开始, " +
"exchange: {}, " +
"routing key: {}, " +
"correlationId: {}, " +
"replyCode: {}, " +
"message: {} ",
correlationData.getReturned().getExchange(),
correlationData.getReturned().getRoutingKey(),
correlationData.getId(),
correlationData.getReturned().getReplyCode()+1,
new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8));
rabbitTemplate.convertAndSend(
correlationData.getReturned().getExchange(),
correlationData.getReturned().getRoutingKey(),
new String(correlationData.getReturned().getMessage().getBody(),StandardCharsets.UTF_8),
correlationData
);
}
}
}
}
前面已经说了失败重试需要依赖CorrelationData的信息,依赖的就是他的ReturnedMessage信息,ReturnedMessage含有消息体Message,交换机、路由key,以及relayCode(可以用来标识重试次数),所以这部分信息我们可以统一放在前置处理器中进行封装,而不用每次发送消息时进行填充,当然不用前置处理器也是完全可以的。这里有个前提是CorrelationData不可以为空,也就是使用RabbitTemplate发送消息时要传入CorrelationData。
package com.cheng.common.mq.api.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.UUID;
/**
* @author pcc
* @version 1.0.0
* @description 消息前置处理器
* 注意spring实际执行的是第二个方法
*/
@Slf4j
public class DealReturnMessagePostProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
return message;
}
@Override
public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
// correlationID / messageId
String id = UUID.randomUUID().toString().replace("-","").toLowerCase();
Message msg = message;
// 这里只有CorrelationData AsyncCorrelationData,后者基本不用,此处不做适配
if(correlation !=null && correlation instanceof CorrelationData ){
CorrelationData correlationData = (CorrelationData) correlation;
// 构建message,只有重试的时候才会从新构建消息
if(correlationData.getReturned()!=null && correlationData.getReturned().getReplyCode()!=0){
msg = MessageBuilder.fromMessage(correlationData.getReturned().getMessage())
.build();
}else{
// 首次推送,共用messageid与correlationid
correlationData.setId(id);
msg = MessageBuilder.fromMessage(msg)
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8")
.setTimestamp(Date.from(LocalDateTime.now().atZone( ZoneId.systemDefault()).toInstant()))
.setMessageId(id)
.build();
}
// 这里构建的ReturnedMessage,是为了方便在回调时获取到发送时的信息,方便进行重试
correlationData.setReturned(new ReturnedMessage(
msg,
correlationData.getReturned()!=null?correlationData.getReturned().getReplyCode():0,
correlationData.getReturned()!=null?correlationData.getReturned().getReplyText():"",
exchange,
routingKey)
);
log.info("RabbitTemplate 前置处理器:消息发送成功, exchange: {}, routingKey: {}, correlationId: {}, messageId: {}, message: {}", exchange, routingKey, correlationData.getId(),msg.getMessageProperties().getMessageId(),new String(msg.getBody(), StandardCharsets.UTF_8));
}
return postProcessMessage(msg);
}
}
将前置处理器交给RabbitTemplate,这样每次RabbitTemplate发型消息前都会帮我们封装CorrelationData了。这块代码其实写在了回调配置类里了,这里重复展示下。
@PostConstruct
public void initRabbitTemplate(){
// 更换消息转换器,json类型的消息,接收到以后会有转义符
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息前置处理器
rabbitTemplate.setBeforePublishPostProcessors(new DealReturnMessagePostProcessor());
}
消息发送没有什么特殊的,需要关注的就是CorrelationData不为空即可。
package com.ebbing.task.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author pcc
* @version 1.0.0
* @description 测试Rabbit
*/
@Slf4j
@RestController
public class TestRabbitController {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping(value ="/send")
public void testSend(){
// 开启回调的话,CorrelationData不可为空
rabbitTemplate.convertAndSend("amq.topic1", "one", "hello ampq",new CorrelationData());
}
}
下面是发送信息失败的重试(只需要写个错误的交换机名称就会触发),可以清晰的看到消息在正常的重试了(省略了重复信息)。
2024-01-09 16:05:13.496 INFO 43776 --- [nio-8808-exec-3] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功, exchange: amq.topic1, routingKey: one, correlationId: 853f32dc470c457f9da691aa9c51f5a6, messageId: 853f32dc470c457f9da691aa9c51f5a6, message: "hello ampq"
2024-01-09 16:05:13.512 ERROR 43776 --- [68.150.204:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'amq.topic1' in vhost '/', class-id=60, method-id=40)
2024-01-09 16:05:13.515 ERROR 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'amq.topic1' in vhost '/', class-id=60, method-id=40), exchange: amq.topic1, routing key: one,correlationId: 853f32dc470c457f9da691aa9c51f5a6,replyCode: 0, message: "hello ampq"
2024-01-09 16:05:15.516 INFO 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 已开启交换机发送失败重试,最大重推次数:10 ,开始第:1 次重推, exchange: amq.topic1, routing key: one, correlationId: 853f32dc470c457f9da691aa9c51f5a6, replyCode: 1, message: "hello ampq"
2024-01-09 16:05:15.519 INFO 43776 --- [nectionFactory2] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功, 。。。。
2024-01-09 16:05:15.525 ERROR 43776 --- [nectionFactory1] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:。。。。
2024-01-09 16:05:19.525 INFO 43776 --- [nectionFactory1] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 已开启交换机发送失败重试,最大重推次数:10 ,开始第:2 次重推, 。。。。
2024-01-09 16:05:19.528 INFO 43776 --- [nectionFactory1] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功,。。。。
2024-01-09 16:05:19.530 ERROR 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:。。。。
2024-01-09 16:05:23.531 INFO 43776 --- [nectionFactory2] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 已开启交换机发送失败重试,最大重推次数:10 ,开始第:3 次重推, 。。。。
2024-01-09 16:05:23.534 INFO 43776 --- [nectionFactory2] c.c.m.a.p.DealReturnMessagePostProcessor : RabbitTemplate 前置处理器:消息发送成功, 。。。。
2024-01-09 16:05:23.537 ERROR 43776 --- [nectionFactory1] c.c.c.m.a.c.RabbitConfirmConfiguration : RabbitTemplate 发送消息到达交换机失败, 原因:。。。。
。。。。。。
这个很简单,只需要我们发送时声明消息的投递模式是持久化的即可,RabbitTemplate的默认模式就是持久化的,所以这里我们可以不做任何操作,如果想要手动设置的话则在Message中设置即可,可以将这段代码放入前置处理器:
MessageBuilder.fromMessage(msg)
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8")
.setTimestamp(Date.from(LocalDateTime.now().atZone( ZoneId.systemDefault()).toInstant()))
.setMessageId(id)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 这是默认值,可以不设置,默认就是持久化消息
.build();
交换机、队列、绑定等信息可以在Rabbit的管理后台创建也可以在Java代码中创建都是可以的。
这里创建默认也是持久化的,需要注意的是这里在项目启动阶段不会创建交换机和队列或者绑定,只有在使用mq时才会使用RabbitAdmin触发创建(版本:springboot2.6.11)。
package com.ebbing.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author pcc
* @version 1.0.0
* @description 基础信息类,交换机、队列默认是持久化的
*/
@Configuration
public class RabbitBaseConfig {
// ***************交换机***************
@Bean("test-java-exchange")
public DirectExchange createExchange(){
return ExchangeBuilder.directExchange("test-java-exchange").durable(true).build();
}
@Bean("test-java-exchange2")
public DirectExchange createExchange2(){
return ExchangeBuilder.directExchange("test-java-exchange2").durable(true).build();
}
// ***************队列***************
@Bean("test-java-queue")
public Queue createQueue(){
return QueueBuilder.durable("test-java-queue").build();
}
@Bean("test-java-queue2")
public Queue createQueue2(){
return QueueBuilder.durable("test-java-queue2").build();
}
@Bean("test-java-queue3")
public Queue createQueue3(){
return QueueBuilder.durable("test-java-queue3").build();
}
// ***************队列绑定交换机绑定***************
@Bean
public Binding createBinding(@Qualifier("test-java-queue") Queue queue, @Qualifier("test-java-exchange") DirectExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange).with("key.test-java-exchange");
}
@Bean
public Binding createBinding2(@Qualifier("test-java-queue2") Queue queue, @Qualifier("test-java-exchange2") DirectExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange).with("key.test-java-exchange2");
}
@Bean
public Binding createBinding3(@Qualifier("test-java-queue3") Queue queue, @Qualifier("test-java-exchange2") DirectExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange).with("key.test-java-exchange2");
}
}
这里使用simple进行展示,direct、stream会单独抽出来说一说。
新增如下配置:
spring:
rabbitmq:
addresses: 192.168.150.204:5672
username: guest
password: guest
requested-heartbeat: 5 # 默认值
virtual-host: /
listener:
type: simple # 指定监听模式,此处可以不配置,如不配置可根据下方会自动适配
simple: # 与上方的simple对应
acknowledge-mode: manual # none不做ack、auto自动ack、manual手动ack
默认是auto模式,也就是消息收到就会自动进行ack,此时无论是消息处理成功与否都不会从新入队,这样就会造成消息丢失,所以我们需要使用手动ack,只有我们手动确认了消息,消息才会从队列中进行删除。下面是手动确认的代码:
@RabbitListener(queues = {"Test-queue-one"})
public void testManualAck(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{
log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);
channel.basicAck(deliveryTag,false);
}
这里使用的是 channel.basicAck(deliveryTag,false),deliveryTag表示的是消息在队列里的位置,mq的服务端需要靠这个找到具体的消息,false 表示不使用批量确认,如果使用批量确认那么在接收这条消息之前收到的消息都会被确认掉,一般不会使用批量确认,除非是可有可无的信息,丢失几条也没有影响,否则不要使用批量确认,传false即可。
其他消息确认的方法:
通过上面的举措已经基本可以保证消息的不丢失了,但是还有一种可能就是消息发送失败和监听失败的场景,此时可能消息都无法发送和接收,这种肯定是因为mq的服务异常导致的,所以一般mq是需要配置集群的,所以可以规避这种单节点突然宕机的风险。如果是单节点就需要增加重连机制了。
spring:
rabbitmq:
addresses: 192.168.150.204:5672
virtual-host: /
username: guest
password: guest
publisher-confirm-type: correlated # 交换机回调 none/simple/correlated
publisher-returns: true # 队列回调
template: # 此处用于发送消息连接mq异常时或者其他场景发生异常(exception)时进行重试,发送交换机、队列的nack并不会触发这个重试
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
max-interval: 10000
上面配置的是3次重试,加上原始的一次是4次,若是4次还连接不上就会报异常。当然集群也是可以使用这个配置进行重连。SpringBoot中支持了两种消息监听器simple(SimpleMessageListenerContainer)、direct(DirectMessageListenerContainer)。在最早时只有simple,2.0之后开始添加了direct。他们的特点各有利弊,需要使用哪个需要根据自身业务场景做选择。
这是最好的监听器,也是默认的监听容器。适用于普遍大多数的场景,所以一般如果没有特殊需要直接使用simple即可,使用simple时支持以下参数的配置:
spring:
rabbitmq:
addresses: 192.168.150.204:5672
username: guest
password: guest
host: 192.168.150.204
requested-heartbeat: 5 # 默认值
virtual-host: /
port: 5672
listener:
type: simple
simple:
prefetch: 100 # 消息预取的数量,默认值是250,越小消息消费越慢
acknowledge-mode: manual
concurrency: 10 # 指定消费者数量,在simple中该值等于channel的数量。一个消费者一个channel
default-requeue-rejected: false # 消息决绝后默认不重新入队
retry: # 当消费时出现异常,默认情况会进行重试,这个重试是在消费者本地从新尝试消费,而不是从队列里重试
enabled: true
max-attempts: 3
initial-interval: 1000
如上配置了消费者数量是10,这里的消费者便是10了,且会有10各channel:
下面是channel
这是simple的消息的处理模型:
下面是direct常用的配置:
spring:
application:
name: ebbing-message
rabbitmq:
addresses: 192.168.150.204:5672
username: guest
password: guest
host: 192.168.150.204
requested-heartbeat: 5 # 默认值
virtual-host: /
port: 5672
listener:
type: direct
direct:
prefetch: 100
acknowledge-mode: manual
consumers-per-queue: 20 # 每个队列消费者的数量,与上面的concurrency类似
default-requeue-rejected: false
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
consumers-per-queue 配置了20个消费者,如下会体现出20哥消费者和20个channel
下面是channel的信息
下面是direct的处理模型:
对比direct和simple可以发现他们最大的不同在于消费者对应的线程上,在simple中每个消费者都会有一个线程(这里说的消费者指的是使用concurrency指定的消费者),而direct则是共享线程。因此direct支持手动设置线程池,如果使用direct则需要我们手动设置线程池,其他则区别不大。
很简单,只需要如下即可,这样便会取代默认的CachingConnectionFactory 连接工厂了。
package com.cheng.ebbing.message.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author pcc
* @version 1.0.0
* @description 多mq配置类
*/
@Configuration
public class anothermqconfig {
@Bean
public CachingConnectionFactory getConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("192.168.150.204");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setExecutor(new ThreadPoolExecutor(
3,
3,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()));
return connectionFactory;
}
}
这里设置的线程最大是3个都是核心线程,测试下,下面是测试代码:
@RabbitListener(queues = {"test-java-queue"})
public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{
log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);
log.info("当前处理线程:{}",Thread.currentThread().getName());
channel.basicAck(deliveryTag,false);
}
下面是执行截图,看一看到确实只有3
如果一个项目想要同时使用simple和direct呢(不过这种场景可能不是太多,不过可能会碰到一个项目需要使用不同的rabbit,他们的ip和端口不同,都是一样的处理)。
配置一个独立的mq监听:
package com.cheng.ebbing.message.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author pcc
* @version 1.0.0
* @description 多mq配置类
*/
@Configuration
public class anothermqconfig {
@Bean("directRabbitListenerContainerFactory")
public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("192.168.150.204");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setExecutor(new ThreadPoolExecutor(
3,
3,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()));
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMissingQueuesFatal(false);
return factory;
}
}
为监听者声明监听工厂:
// 使用默认配置监听 simple
@RabbitListener(queues = {"dead-letter-queue"})
public void testDeadLetterQueue(@Payload String message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws IOException {
log.info("收到死信队列消息:{}",message);
channel.basicAck(deliveryTag,false);
}
// 使用自定义的容器工厂进行监听 direct
@RabbitListener(queues = {"test-java-queue"},containerFactory = "directRabbitListenerContainerFactory")
public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{
log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);
log.info("当前处理线程:{}",Thread.currentThread().getName());
channel.basicAck(deliveryTag,false);
}
如此便可以实现simple和direct共存了。
消费者的预取是指一次性从mq队列里获取的消息数量,比如mq里有500条消息,而默认的预取条数是250,则如果只有一个消费者,那么当消费者连接到mq时,应该是250条消息属于unacked的,total还是500。消费者对消息的预取之后是将消息存储到了本地,这样就可以减少消费者于mq服务端的交互次数,从而提升消息消费性能,当消息量较大时,预取的值也可以适当调高,但具体调整多少,应该根据机器的性能测试而定,而不可以随意指定。
注意消息预取并不会导致消息的乱序,预取以后得消息的顺序与mq服务器中的消息顺序还是一致的。如果想要调整预取的数量,在SpringBoot中可以如下进行设置:
spring:
rabbitmq:
addresses: 192.168.150.204:5672
username: guest
password: guest
host: 192.168.150.204
requested-heartbeat: 5 # 默认值
virtual-host: /
port: 5672
listener:
simple:
prefetch: 100 # 消息预取,默认值是250
注意:prefetch应该设置合理的值,太小容易导致消息处理过慢,太大可能会导致服务器性能下降。其次prefetch使用时应该使用manual的ack模式,虽然auto模式下野生效,不过会造成消息重复投递的问题,所以使用prefetch应使用manual模式的ack
如果一个队列有多个消费者在连接,那么默认情况下每个消费者接收到的消息数量都是一样的,无论机器性能相同与否,这个是mq服务端的负载均衡策略-轮询。这也是rabbit的默认工作模式,这个很好验证,建立两个消费者看他们的消息消费数量即可。这里不细说了,很容易验证且没有任何难度。主要说一下如何破坏这种模式,然后思考下破坏好还是不破坏好。
如何破坏work
spring:
rabbitmq:
addresses: 192.168.150.204:5672
username: guest
password: guest
host: 192.168.150.204
requested-heartbeat: 5 # 默认值
virtual-host: /
listener:
simple:
prefetch: 100 # 消息预取的数量,默认值是250,越小消息消费越慢
在消费者代码中增加channel.basicQos代码:@RabbitListener(queues = {"Test-queue-one"})
public void testManualAck(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{
log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);
count1.getAndIncrement();
channel.basicQos(100);
TimeUnit.MILLISECONDS.sleep(100L);
channel.basicAck(deliveryTag,false);
System.out.println("消费者1处理了几条:"+count1);
}
@RabbitListener(queues = {"Test-queue-one"})
public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{
log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);
channel.basicQos(100);
count2.getAndIncrement();
channel.basicAck(deliveryTag,false);
System.out.println("消费者2处理了几条:"+count2);
}
此时我往队列里发了500条消息,然后处理结果是消费者1处理了:112,消费者2处理了:388。可见work模式被破坏了。破坏work好还是不破坏好?
这个需要根据机器来判断,如果消费者性能都差不多,则不需要破坏,我们只需要寻找到预取数量得合理值即可,若是机器性能存在差异,则需要我们破坏work了,从而使得消费者集群拥有更好地消费能力,不至于快的等慢的出现资源浪费的情况。
死信队列就是死亡的消息该去的地方,那何种消息才是死亡的消息呢?
以上三种场景会导致消息死亡,死亡的消息则需要一个地方进行存储就是死信队列了,其实死信队列就是普通队列,还有死信交换机也是普通交换机,只是他们用于处理死信所以才叫死信交换机、死信队列,所以重点不是死信交换机和队列,而是死信。
下面是死信队列和交换机以及绑定的创建,和普通交换机、队列、绑定并无区别。
// 死信交换机-直连
@Bean("dead-letter-exchange")
public DirectExchange createExchange3(){
return ExchangeBuilder.directExchange("dead-letter-exchange").durable(true).build();
}
// 死信队列
@Bean("dead-letter-queue")
public Queue createDeadLetterQueue(){
return QueueBuilder.durable("dead-letter-queue").build();
}
// 死信交换机与死信队列绑定
@Bean
public Binding createDeadLetterBinding(@Qualifier("dead-letter-queue") Queue queue,
@Qualifier("dead-letter-exchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dead");
}
后面有死信消息就会往这里发送。
为消息设置存活时间有两种方式(这里只说代码的方式,不说后台):队列设置ttl属性,这样队列中所有的消息如果在ttl内没有被消费都会变成死信,另外一种则是为消息设置ttl,这样ttl只会针对一条消息生效。先演示设置队列的ttl的方式。
这里只有队列声明时增加ttl属性即可,其他和正常的队列、exchange等都无区别。
// TTL queue
@Bean("ttl-queue")
public Queue createTtlQueue(){
return QueueBuilder.durable("ttl-queue")
.ttl(10000) // 设置队列消息存活时间
.deadLetterExchange("dead-letter-exchange") // 死信交换机
.deadLetterRoutingKey("dead") // 死信路由
.build();
}
// TTL exchange
@Bean("ttl-exchange")
public DirectExchange createTtlExchange(){
return ExchangeBuilder.directExchange("ttl-exchange").durable(true).build();
}
// TTL queue与TTL exchange绑定
@Bean
public Binding createTtlBinding(@Qualifier("ttl-queue") Queue queue,
@Qualifier("ttl-exchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl");
}
下面测试发送消息到ttl-queue:
package com.ebbing.mq;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author pcc
* @version 1.0.0
* @className TestMQ
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestMQ {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test3() throws Exception{
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("ttl-exchange","ttl","hello ttl-queue:"+i+"个 ", new CorrelationData());
log.info("发送消息成功:{}",i);
}
}
}
10s后消息进入死信队列了:
还可以为消息单独设置ttl,队列交换机等都是普通队列、交换机,唯一区别是为该队列声明死信交换机和路由即可。如下方式:
普通队列声明死信交换机、路由:
@Bean("test-java-queue")
public Queue createQueue(){
return QueueBuilder.durable("test-java-queue")
.deadLetterExchange("dead-letter-exchange")
.deadLetterRoutingKey("dead")
.build();
}
测试发送ttl消息:
package com.cheng.ebbing.message.mq;
import lombok.extern.slf4j.Slf4j;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* @author pcc
* @version 1.0.0
* @description 描述下这个类吧
*/
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class Test {
@Resource
RabbitTemplate rabbitTemplate;
@org.junit.Test
public void testSendTTLMsg(){
rabbitTemplate.convertAndSend("test-java-exchange","key.test-java-exchange","hello TTL message",message->{
message.getMessageProperties().setExpiration("5000");
return message;
});
log.info("消息发送完成");
}
}
5s后消息进入死信队列:
为接收消息的队列声明死信交换机和队列(交换机和绑定无区别,不贴了):
@Bean("test-java-queue")
public Queue createQueue(){
return QueueBuilder.durable("test-java-queue")
.deadLetterExchange("dead-letter-exchange")
.deadLetterRoutingKey("dead")
.build();
}
消息接收者直接拒绝消息:
@RabbitListener(queues = {"Test-queue-one"})
public void testManualAck2(@Payload String msg,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) throws Exception{
log.info("收到消息:{}, 消息顺序:{}",msg,deliveryTag);
// 拒绝消息,且不重新入队-消息死信
channel.basicReject(deliveryTag,false);
}
发送消息:
package com.ebbing.mq;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author pcc
* @version 1.0.0
* @className TestMQ
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestMQ {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test2() throws Exception{
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("test-java-exchange","key.test-java-exchange","hello deadLetter:"+i+"个 ", new CorrelationData());
log.info("发送消息成功:{}",i);
}
}
消息全部进入死信队列了:
队列默认无边界,消息的最大存储量依赖本地磁盘,要想要模拟队列消息满了,需要为队列设置最大长度。
创建相关队列和交换机:
// maxlength-queue
@Bean("maxlength-queue")
public Queue createMaxlengthQueue(){
return QueueBuilder.durable("maxlength-queue")
.maxLength(9)
.deadLetterExchange("dead-letter-exchange")// 死信交换机
.deadLetterRoutingKey("dead")
.build();
}
// maxlength-exchange
@Bean("maxlength-exchange")
public DirectExchange createMaxlengthExchange(){
return ExchangeBuilder.directExchange("maxlength-exchange").durable(true).build();
}
// maxlength-queue与maxlength-exchange绑定
@Bean
public Binding createMaxlengthBinding(@Qualifier("maxlength-queue") Queue queue,
@Qualifier("maxlength-exchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("maxlength");
}
发送消息没有区别,不重复贴了,期望结果是队列里面9条,死信队列里1条,下图可见与预期并无区别。
然后看下消息希望的原因,可以看到是因为队列满了:
死信消息不可能扔到私信队列就完事了,我们还需要针对死信消息做对应处理,可以根据消息的死亡类型做不通处理。
可以通过x-death 来获取到消息死亡的明细信息,这样就方便我们根据死亡的类型和原队列等信息进行不同的处理了。
// 监听死信队列-判断消息死亡类型
@RabbitListener(queues = {"dead-letter-queue"})
public void testDeadLetterQueue2(@Payload String message,
@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,
@Header("x-death") List<Map<String, Object>> xDeathHeaders,
Channel channel) throws IOException {
log.info("收到死信队列消息:{}, 死信信息: ",message);
xDeathHeaders.forEach(item->{
item.forEach((key,value)->{
log.info("key:{},value:{}",key,value);
});
});
channel.basicAck(deliveryTag,false);
}
下面是输出:
任务超时处理,订单超时未支付等都是延迟队列的应用场景。如果不使用延时队列,一般是将其放入到任务列表定时轮询,这种做法有些low了,最好的处理方式还是使用延迟队列来解决。Rabbit中延迟队列有两种实现方式,一种是使用ttl+死信队列实现,一种是安装延时交换机插件。这里先说使用ttl+私信队列。
原理:为消息设置ttl,消息死亡后就会进入死信队列,死信队列就是延迟队列,通过监听死信队列的信息达到延迟队列的目的,这种代码就不贴了,上面3 .2 TTL-Message 已经介绍过了,代码并没有什么区别。
这里需要安装一个delay的插件:rabbitmq-delayed-message-exchange
官网位置:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags
注意下载的是后缀ez的文件。
将其复制到容器内部:
docker cp rabbitmq_delayed_message_exchange-3.11.1.ez 07:/plugins/
然后执行以下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
有以下输出以后重启容器:
然后交换机的创建就会有延迟队列的选项:
注意:延迟消息延迟的地方是交换机到达队列这个步骤,当消息通过交换机发出去后会经过指定的延时时间才会到达队列。
下面是延迟交换机的声明代码:
// delay-queue
@Bean("delay-queue")
public Queue createDelayQueue(){
return QueueBuilder.durable("delay-queue")
.build();
}
// delay-exchange
@Bean("delay-exchange")
public CustomExchange createDelayedExchange(){
Map<String,Object> map = new HashMap<String,Object>();
map.put("x-delayed-type","direct");
return new CustomExchange("delay-exchange","x-delayed-message",true,false,map);
}
// delay-queue与delay-exchange绑定
@Bean
public Binding createDelayBinding(@Qualifier("delay-queue") Queue queue,
@Qualifier("delay-exchange") CustomExchange exchange){
// 这里会多一个noargs,因为使用的是CustomExchange
return BindingBuilder.bind(queue).to(exchange).with("delayed").noargs();
}
然后进行测试,下面是测试代码:
@org.junit.Test
public void testSendDelayMessage(){
rabbitTemplate.convertAndSend("delay-exchange","delayed","hello delay个 ", message->{
message.getMessageProperties().setDelay(10000); // 单位毫秒
return message;
});
log.info("消息发送完成");
}
如此即可实现延迟消息了,注意消息真正延迟的地方是交换机和队列之间,不是队列和消费者之间。
Rabbit从3.6开始支持了Lazy Queue,在3.12 版本中已经将Lazy Queue设置为了默认模式。
Rabbit已经默认使用了,足以说明问题,我们先看看不使用Lazy Queue时消息处理可能存在问题:
所以无论你用什么方式,消息量特别大时,系统的内存压力肯定会增加这是无法避免的,如果想要解决这个问题,只能是把信息从内存中移出去,但信息移出去也会碰到另外的问题,信息使用时加载磁盘中的信息会增加IO。所以如果移出去肯定会增加信息消费时的时间。
事实上Rabbit就是这么做的,Rabbit通过声明队列的模式来将队列声明为惰性队列,惰性队列的消息会存储在磁盘中(内存中只有少量的消息),当消息使用时再从磁盘加载出来。那么惰性队列的优缺点是什么:
下面是声明队列时,将其设置为惰性队列:
// 惰性队列
@Bean("lazy-queue")
public Queue createLazyQueue(){
return QueueBuilder.durable("lazy-queue")
.lazy() // 惰性队列
.build();
}
给这个队列发了两条持久化信息,如下,可以看出消息不存在内存而是在磁盘:
在做中台类系统时,使用Rabbit来解耦上下游系统是个很不错的选择,不过若是系统没有消息记录则会增加很多扯皮的事情,而且这个消息不是指发送的日志,也不是指发送方的发送成功确认日志,而是指消费者和Rabbit的通信日志,有了这个就可以做到全流程信息把控,防止下游系统信息接收到说没有收到了,这样的事情我可碰到的太多了。
使用trace日志的话,需要使用trace插件来实现,Rabbit安装以后trace插件就默认携带,只是没有开启,所以我们只需要开启即可:
# 查看所有插件
rabbitmq-plugins list
# 启用trace插件
rabbitmqctl trace_on
rabbitmq-plugins enable rabbitmq_tracing
操作完成后刷新mq的management就会在如下位置除夕拿Tracing:
如下图,新增一个trace:
然后就可以发送消息进行测试trace了,当消息发送成功后会有如下信息在日志里,这表示消息发送到了交换机了,注意这里的Message published表示的是消息到达交换机了
然后再看下消费者收到消息的日志,Message received 表示消费者已经接收到了该消息,下面是消息的具体信息:
trace日志的好处在于可以明确的知道下游系统是否成功接收到了消息。
trace日志好用单也存在一个明显的问题,就是日志不会自动分割,所以需要我们自己加日志的分割,这个可以使用shell编辑一个脚本来实现日志的分割,下面是分割trace文件的shell,可以使用,然后增加一个crontab即可,我用的是按日期分割,每天一个日志文件。
#!/bin/bash
cp /var/tmp/rabbitmq-tracing/trace_log.log /apps/rabbitmq_log/trace_log_$(date +%Y%m%d).log
cat /dev/null > /var/tmp/rabbitmq-tracing/trace_log.log
这里是使用MQ可能会碰到的一些问题和对应的解决思路,供参考欢迎补充。
MQ可能会出现消息重复入队或者重复发送的情况,消费者消费超时会导致消息重新入队,消费者的requeue为true也会重新入队,死信队列处理完消息也有可能重新扔进来等等都有可能出现消息重复的现象,在业务上来说消息的类型无非就是增删改查,那这些场景怎么做幂等呢?
一般消息发送肯定是想要在数据处理完成以后才会发送消息,这样可以保证消息的准确性,这就需要我们在事务完成后再进行消息发送,那该怎么做才能让消息在事务结束以后再发送呢?
Spring提供了事务的声明周期函数,我们可以利用事务的声明周期函数来实现,代码如下,表示事务完成以后再执行该逻辑,这块代码需要写在含有事务的方法内部(如使用注解Transactional)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization(){
@Override
public void afterCompletion(int status) {
// 事务commited:0 提交完成,1回滚完成,2其他
if(Predicate.isEqual(status).equals(0)){
rabbitTemplate.convernetAndSend(...);
}
}
});
如此便可以保证消息肯定是在数据库事务完成之后才会发送,而此时数据一定是已经持久化了的。
这个错误笔者在云上的Rabbit碰到过一次,报错显示消息过大mq报错了,但是自己搭建的mq无法复现该问题,查询资料发现Rabbit只有一个总体大小限制并没有看到单条消息的限制,可能是版本不一致吧。这里记录下,解决方法很简单:只需要将消息分开进行发送即可,如果是无法拆分的消息,则需要考虑业务问题了可以只给其他系统发id,让他来你的系统通过id检索,毕竟信息太大,这么做也是可以的。
ack超时导致消息重新入队,这样可能会有一个问题,即使做了ack消息还是一直递增。而不减少,以为ack超时,所以消息并没有在unack的位置,所以即使ack了消息也不会消失。这种问题增加消费节点依然是解决不了的,需要优化消费者代码,可以先入中间表然后ack,不要做业务上的处理,减少ack的等待时间,防止消息重复入队。
这个问题应该很容易碰到,不废话直接说方案,首先需要定位消息积压的原因,是因为发送方消息量激增还是因为下游消费能力突然下降,定位号问题以后我们来看解决方案:
方案一:
这个方案旨在提升消费者的节点,和单节点的上的消费者数量。
方案二:
这个场景主要是因为单节点的系统性能瓶颈导致的,方案一中的第一点不适用方案二,其他均可以适用于方案二。