交换机的持久化
、队列的持久化
、消息的持久化
。出现消息丢失的场景有:
直接实例化对应的 Exchange
实现类即可,默认:持久化的、非自动删除的。
@Bean
public DirectExchange directExchange() {
return new DirectExchange(directExchange);
}
Exchange
源码:
package org.springframework.amqp.core;
public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
...
public AbstractExchange(String name) {
// 默认是持久化的、非自动删除的。
this(name, true, false);
}
...
}
在 RabbitMQ 的原生 API 中,例如:Java 客户端 API,声明持久化交换机时,需要将 durable
参数设置为 true
:
// 声明交换机:(交换机名称,交换机类型,持久化)
channel.exchangeDeclare("myExchange", "direct", true);
在 Spring AMQP 中,我们可以使用 ExchangeBuilder
来创建持久化的交换机:
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
Exchange exchange = ExchangeBuilder.directExchange("myExchange")
.durable(true) // 交换机持久化
.build();
直接实例化 Queue
类即可,默认:持久化的、非独占的、非自动删除的。
@Bean
public Queue myQueue() {
return new Queue("myQueue");
}
Queue
源码:
package org.springframework.amqp.core;
public Queue(String name) {
// 默认是持久化的、非独占的、非自动删除的。
this(name, true, false, false);
}
在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化队列时,需要将 durable
参数设置为 true
:
// 声明队列:(队列名称,持久化,非独占,非自动删除,可选队列参数)
channel.queueDeclare("myQueue", true, false, false, null);
在 Spring AMQP 中,可以使用 QueueBuilder
来创建持久化的队列:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue queue = QueueBuilder.durable("myQueue").build();
即使队列时持久化的,发送到队列中的消息默认情况下 可能仍然是非持久化的。要使消息持久化,需要在发送消息时设置消息属性为持久化。
在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化消息时,需要将 deliveryMode
参数设置为 2
:
import com.rabbitmq.client.AMQP;
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 这里2对应MessageDeliveryMode.PERSISTENT
.build();
// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", props. messageBytes);
或者,我们可以直接使用 MessageProperties.PERSISTENT_TEXT_PLAIN
来进行指定消息的持久化:
import com.rabbitmq.client.MessageProperties;
// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
在 Spring AMQP 中,可以使用 rabbitTemplate.convertAndSend()
来创建持久化的消息:
rabbitTemplate.convertAndSend("myQueue", body, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
});
通过上述步骤,消息会被持久化存储,只要队列也被正确配置为持久化,即使 RabbitMQ 服务器重启,消息也将被保留下来。
补充:
然而,即使队列和消息都是持久化的,也不能完全保证消息的 100% 不丢失。例如:在消息尚未被刷写到磁盘时,RabbitMQ 服务器突然崩溃,这种情况下的消息仍然可能会丢失。此外,RabbitMQ 不保证消息的立即持久化,而是尽可能快地将消息保存到磁盘。
思考:将交换机、队列、消息都设置持久化之后就能保证数据不会丢失吗?
答:当然不能,需要从多方面考虑:
场景1: 如果消费者订阅队列的时候将 autoAck
(自动确认)设置为 true,虽然消费者接收到了消息,但是没有来得及处理就宕机了,那消息也会丢失。
解决方案: 将消费者的消息确认机制改为手动确认,带处理完消息之后,手动删除消息。
场景2: 在 RabbitMQ 服务器,如果消息正确被发送,但是 RabbitMQ 服务器中的消息还没来得及持久化,即没有将数据写入磁盘时,如果服务器发生异常,消息也会丢失。
解决方案: 可以 通过 RabbitMQ 集群的方式实现消息中间件的高可用。
因此,还需要做好生产者和消费者的 消息确认机制
以及通过 集群
的方式来实现 RabbitMQ 的高可用。
整理完毕,完结撒花~ 🌻
参考地址:
1.RabbitMQ保证消息可靠性,https://www.cnblogs.com/auguse/articles/17712620.html