返回ack,怎么感觉这么像某个tcp的3次握手。
使用资料提供的案例工程.
在图形化界面创建一个simple.queue的队列,虚拟机要和配置文件里面的一样。
AMQP里面支持多种生产者确认的类型。
simple是同步等待模式,发了消息之后就一直等待结果,可能会导致代码阻塞。
correlated是异步回调模式,像前段的ajax请求的回调函数。
ApplicationContextAware是bean工厂通知。会在Spring容器创建完后来通知并传一个spring容器到下面的方法。然后从中取到rabbitTemplate的bean并设置ReturnCallback。?
ReturnCallback:消息到了交换机,路由时失败了没有到达消息队列
ConfirmCallback:消息连交换机都没到。
这个不像ReturnCallback只能配置一个,这个可以在每次发消息时设置。
这里在发送消息时多了一个correlationData,这是在配置开关选择的confirm类型为correlated。里面封装了消息的唯一id和callback.
callback里面的result是成功的回调函数,ex是失败的回调函数。这里的失败是指回调都没收到。
先是在生产者的配置文件里要加上前面的配置j
编写returnCallback
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//记录日志
log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
replyCode,replyText,exchange,routingKey,message.toString());
//如果有需要的话,可以重发消息
});
}
}
编写ConfirmCallback
这里先要在图形界面手动将交换机和消息队列做绑定?
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
//1.准备消息
String message = "hello, spring amqp!";
//2.准备correlationData
//2.1消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//2.2准备ConfirmCallback
correlationData.getFuture().addCallback(result -> {
//判断结果
if(result.isAck()){
//ACK
log.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());
}else{
//NACK
log.error("消息投递到交换机失败!消息ID:{}",correlationData.getId());
}
}, ex -> {
//记录日志
log.error("消息发送失败!",ex);
//重发消息
});
//3.发送消息
rabbitTemplate.convertAndSend("camq.topic", "simple.test", message,correlationData);
}
测试得到
成功的测试情况
?
失败的测试情况
投递交换机失败,交换机不存在
投递队列失败,队列不存在
?
这里通过重启rabbitmq容器发现消息都不见了可以确认,rabbitmq和redis一样都是内存运行的。
甚至我手动加上的消息队列和绑定关系都不见了。这里消息队列不见是因为前面创建队列时选择的是Transient,不持久化。系统默认的交换机都还在,是因为durable为true,持久化。
创建队列或交换机的时候可以设置Durability为Durable即可持久化。
在消费者代码中进行交换机和队列的创建,然后可以看见如下持久化的交换机和队列.
@Configuration
public class CommonConfig {
@Bean
public DirectExchange simpleExchange(){
return new DirectExchange("simple.direct",true,false);
}
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
}
手动发送一条消息进行测试
重启之后消息还是消失了。
要想让消息持久化,需要在发送消息时指定。
@Test
public void testDurableMessage(){
//1.准备消息
Message message = MessageBuilder.withBody("hello,pop".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
.build();
//2.发送消息
rabbitTemplate.convertAndSend("simple.queue",message);
}
?重启之后消息就持久化了。
通常在springamqp中这些都是持久化的。
在none模式下,消费者拿到消息都就报异常了,然后消息也没了。
在auto模式下,消费者拿到消息后给mq报了个unack,然后消息会重新投递,消费者继续拿消息,tmd,死循环了。?但是这里消息就不会消失了。
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1/0);
log.info("消费者处理消息成功!");
}
重试次数耗尽之后会将消息丢弃。
?在消费者代码中
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue",true);
}
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}
?重新发送消息进行测试,可以看见重试次数耗尽之后就送到了死信队列了。
在里面将异常的堆栈信息也包含了.?
?
区别在于,上一个是消费者失败之后寻找交换机路由到error队列,这个是退回到队列,再指定交换机,最后路由。
这个的应用场景比如说订单超时未支付然后自动取消。
实现??
? ? ? ? ??
准备 代码部分
@RabbitListener(bindings = @QueueBinding(
value=@Queue(name = "dl.queue",durable = "true"),
exchange=@Exchange(name="dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.queue的延迟消息:{}",msg);
}
@Configuration
public class TTLMessageConfig {
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dl.direct")
.deadLetterRoutingKey("dl")
.build();
}
@Bean
public Binding simpleBinging(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
?测试代码
@Test
public void testTTLMessage(){
//1.准备消息
Message message = MessageBuilder
.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
.build();
//2.发送消息
rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
//3.记录日志
log.info("消息成功发送!");
}
10s之后在消费者那里就可以看见
?
?然后这里会以短的优先,5s后消费者就可以收到消息。
这个插件需要找到mq内部的插件文件夹,所以需要在创建容器的时候进行数据卷挂载。
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
官方的安装指南地址为:Scheduling Messages with RabbitMQ | RabbitMQ - Blog
上述文档是基于linux原生安装RabbitMQ,然后安装插件。
RabbitMQ有一个官方的插件社区,地址为:Community Plugins — RabbitMQ
大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。?
查看挂载的数据卷.
docker volume inspect mq-plugins
接下来的看着好麻烦,以后看文档吧.
还真的麻烦的一批,真不想再搞这玩意,文件搞来搞去。
不知道为什么,挂载数据卷时一直报错,不能用自己定义的文件夹来挂载。
?
?
在消费者中如下声明
@RabbitListener(bindings = @QueueBinding(
value=@Queue(name = "delay.queue",durable = "true"),
exchange=@Exchange(name="delay.direct",delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String msg){
log.info("接收到 delay.queue的延迟消息:{}",msg);
}
?在生产者中如下定义
@Test
public void testSendDelayMessage(){
//1.准备消息
Message message = MessageBuilder
.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
.setHeader("x-delay",5000)
.build();
//2.准备correlationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//3.发送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message,correlationData);
log.info("发送消息成功");
}
测试结果如下 成功实现延迟5秒。但是会被报错,理论上说交换机应该立即转发,不会延迟,但是这里的延迟交换机可以帮忙保存消息延迟发送,所以这里才会报错,not_router,消息没有到达队列
?为了解决这个报错,需要修改生产者代码
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//判断是否是延迟消息
if (message.getMessageProperties().getReceivedDelay()>0) {
//是一个延迟消息,忽略错误提示
return;
}
//记录日志
log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
replyCode,replyText,exchange,routingKey,message.toString());
//如果有需要的话,可以重发消息
});
}
}
消费者中声明两个队列。?
@Configuration
public class LazyConfig {
@Bean
public Queue lazyQueue(){
return QueueBuilder.durable("lazy.queue")
.lazy()
.build();
}
@Bean
public Queue normalQueue(){
return QueueBuilder.durable("normal.queue")
.build();
}
}
?测试,准备两个队列之后分别向两个队列发消息。
@Test
public void testLazyMessage(){
for(int i=0;i<1000000;i++){
//1.准备消息
Message message = MessageBuilder
.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的
.build();
//3.发送消息
rabbitTemplate.convertAndSend("lazy.queue", message);
}
}
@Test
public void testnormalMessage(){
for(int i=0;i<1000000;i++){
//1.准备消息
Message message = MessageBuilder
.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的
.build();
//3.发送消息
rabbitTemplate.convertAndSend("normal.queue", message);
}
}
可以看见惰性队列的消息全部到paged out 刷出磁盘了?????、,为什么非惰性队列的也是刷出磁盘了。
?
集群个屁,不搞了.