如图所示,发生消息丢失的可能阶段也就是生产者发送消息,时rabbitmq存储消息时,消费者消费消息时。
项目源码:gitee
默认情况下rabbitmq会把消息存储到内存中,如果在消费者消费消息之前,rabbitmq服务器宕机了,内存就会被释放,消息就会丢失
消费者在获取到消息以后,就会自动给rabbitmq服务端返回一个ack标志,rabbitmq服务端就会把这个消息从队列中删除。但当消费者获取到消息以后,准备进行业务逻辑处理时消费者宕机了,相当于该消息没有被消费成功,即消息丢失。
因此,我们就针对以上3个阶段,分别解决
package com.example.rabbitmqreliable.demos;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean
public DirectExchange directExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding confirmBind(@Qualifier("directExchange") DirectExchange confirmExchange,
@Qualifier("confirmQueue") Queue confirmQueue) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
spring.rabbitmq.host=101.133.141.75
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=ConFirm
# 开启生产者确认机制,当消费者成功处理这个消息时,会向生产者发送一个确认信号,
# 告诉生产者这个消息已经被成功消费了。
# 如果生产者在一定时间内没有收到确认信号,就会重新发送这个消息。
spring.rabbitmq.publisher-confirm-type=correlated
package com.example.rabbitmqreliable;
import com.example.rabbitmqreliable.demos.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class ProviderTests {
/**
* 目标:让生产者获取到rabbitmq服务返回的ack或nack
* 做法:rabbitTemplate需要绑定对应的回调函数
* 分析:目前的rabbitTemplate是spring托管的,并没有对应的回调函数,需要自定义
* 实施:需要自定义rabbitTemplate,并注入到spring容器中
* 一旦我们在spring容器中配置了一个rabbitTemplate,
* 那么spring boot就不会对rabbitTemplate进行自动化配置
*/
@Autowired
private RabbitTemplate rabbitTemplate;
public void test1() {
rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY, "HELLO CONFIRM");
}
}
/**
* ConnectFactory由spring boot根据配置文件中的连接信息实现自动化配置
* 即在spring容器中直接存在了ConnectionFactory对象
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置回调函数
// 而ConfirmCallBack是一个接口,需要一个类去实现他
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 当rabbitmq服务端给生产者放回ack/nack时会执行该方法
* @param correlationData 消息的id,内容
* @param ack 消息是否发送成功
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack) {
System.out.println("消息正常发送给交换机");
}else {
System.out.println("消息没有正常发送给交换机,cause" + cause);
// TODO 处理方案:再次发送消息给rabbitmq,需要获取消息内容
}
}
});
return rabbitTemplate;
}
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 当rabbitmq服务端给生产者放回ack/nack时会执行该方法
* @param correlationData 消息的id,内容
* @param ack 消息是否发送成功
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack) {
System.out.println("消息正常发送给交换机");
}else {
System.out.println("消息没有正常发送给交换机,cause" + cause);
// 处理方案:再次发送消息给rabbitmq,需要获取消息内容
//方案1: 立马拿着id去数据库查消息
// 方案2:通过定时任务重新发送
String msgId = correlationData.getId();
System.out.println("msgId" + msgId);
// 规定消息的最大发送次数3次,发送消息前判断实际发送次数是否大于最大发送次数,如果大于就不进行重新发送,并设置status=2
}
}
});
@Test
public void test1() {
// 发送消息前把消息写入数据库,并分配唯一id(如果发送失败,可以拿这个id去查数据库,重新发送)
// 并且还要记录消息实际发送次数,以及消息状态。当超过发送次数超过了规定值,就设置消息的status为2,发送成功设置消息状态为1
String msgId = UUID.randomUUID().toString().replace("-","");
CorrelationData correlationData =new CorrelationData(msgId);
rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY + "error", "HELLO CONFIRM", correlationData);
}
# 开启生产者确认机制,
spring.rabbitmq.publisher-returns=true
/**
* 给rabbitTemplate绑定回退机制的回调函数
* ReturnCallback是一个接口,使用匿名内部类实现
* 该方法被调用的概率极低,因为从交换机到队列的过程是rabbitmq内部实现的
* 如果会出错,咱们也不会用他
*/
rabbitTemplate.setMandatory(true);//让rabbitmq服务把失败信息回传给生产者
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
// 当消息没有正常转发给队列的时候被调用
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
byte[] body = returnedMessage.getMessage().getBody();
String msg = new String(body);
System.out.println("msg:" + msg);
}
});
package com.example.rabbitmqreliable;
import com.example.rabbitmqreliable.demos.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
@SpringBootTest
public class ProviderTests {
/**
* 目标:让生产者获取到rabbitmq服务返回的ack或nack
* 做法:rabbitTemplate需要绑定对应的回调函数
* 分析:目前的rabbitTemplate是spring托管的,并没有对应的回调函数,需要自定义
* 实施:需要自定义rabbitTemplate,并注入到spring容器中
* 一旦我们在spring容器中配置了一个rabbitTemplate,
* 那么spring boot就不会对rabbitTemplate进行自动化配置
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1() {
// 发送消息前把消息写入数据库,并分配唯一id(如果发送失败,可以拿这个id去查数据库,重新发送)
// 并且还要记录消息实际发送次数,以及消息状态。当超过发送次数超过了规定值,就设置消息的status为2,发送成功设置消息状态为1
String msgId = UUID.randomUUID().toString().replace("-","");
CorrelationData correlationData =new CorrelationData(msgId);
// 把routingKey写错
rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY + "404", "HELLO CONFIRM", correlationData);
}
}
Spring Boot整合RabbitMQ消费者的应答模式:
# 消费者的应答模式
spring.rabbitmq.listener.simple.acknowledge-mode=none
@Component
public class Consumer {
@RabbitListener(queues = RabbitMQConfig.CONFIRM_QUEUE_NAME) // 监听的队列
public void consumerListener(Message message) {
byte[] body = message.getBody();;
String msg = new String(body);
// 进行业务处理
int a = 1 / 0; //产生异常
System.out.println("[consumerListener],msg:" + msg);
}
}
由于我们使用none
自动应答模式,消费者给rabbitmq返回ack,rabbitmq直接把消息从队列中删除,导致消息丢失
修改配置文件中的应答方式为auto,以及引伸出来的其他配置项
# 消费者的应答模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 开启重试机制
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数,否则会无限重试下去
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始化的重试时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 最大的重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=5000
# 乘子(计算每一次时间间隔):1s->2s->4s->5s
spring.rabbitmq.listener.simple.retry.multiplier=2
此时控制台报了3次错误,队列没有消息,消息还是丢失了。
auto模式需要设置最大重试次数,否则会死循环,但是又无法判断最大重试次数是多少
spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.example.rabbitmqreliable.demos;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer {
@RabbitListener(queues = RabbitMQConfig.CONFIRM_QUEUE_NAME) // 监听的队列
public void consumerListener(Message message, Channel channel) {
byte[] body = message.getBody();;
String msg = new String(body);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 进行业务处理
int a = 1 / 0; //产生异常
System.out.println("[consumerListener],msg:" + msg);
// 没有产生异常,给服务端返回ack
// 第一个参数:表示消息的标签,保证消息唯一性
// 第二个数:表示是否需要进行批量应答
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
e.printStackTrace();
// 产生异常
// 给rabbitmq返回nack
// 第三个参数:表示是否将消息重新放入队列中
try {
channel.basicNack(deliveryTag, true, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
启动项目后执行测试代码,控制台会不断报红,死循环。因为没有设置最大重试次数,因此我们需要统计消息的实际消费次数,可以借助redis计算。一旦消息的实际消费次数大于最大消费次数,那么此时需要给rabbitmq返回ack删除该消息,返回之前要将该消息记录数据库中,后期人工处理