持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如重启、断电、停机等)数据将会丢失。主要从以下几个方面保障消息的持久性:
- Exchange 持久化通过定义时设置durable参数为true来保证Exchange相关的元数据不丢失。
- Queue的持久化。也是通过定义时设置durable参数为true来保证Queue相关的元数据不丢失。
- 消息的持久化,通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2,即可实现消息的持久化,保证消息身身不丢失。
样例代码:
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
持久化的实现
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Product {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); ) {
// 交换器元数据持久化 durable参数为true,表示要持久化
channel.exchangeDeclare(
"persistent.ex", BuiltinExchangeType.DIRECT, true, false, false, null);
// 队列元数据持久化,durable参数为true,表示要持久化
channel.queueDeclare("persistent.qu", true, false, false, null);
// 交换机与队列绑定
channel.queueBind("persistent.qu", "persistent.ex", "persistent.rk");
String msg = "hello world:" ;
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentType("text/plain");
// 发送的消息持久化
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(
"persistent.ex", "persistent.rk", properties, msg.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行生产者的代码,检查队列情况
[root@nullnull-os rabbitmq]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ persistent.ex │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌───────────────┬─────────────┬──────────────────┬──────────────────┬───────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├───────────────┼─────────────┼──────────────────┼──────────────────┼───────────────┼───────────┤
│ │ exchange │ persistent.qu │ queue │ persistent.qu │ │
├───────────────┼─────────────┼──────────────────┼──────────────────┼───────────────┼───────────┤
│ │ exchange │ transient.qu │ queue │ transient.qu │ │
├───────────────┼─────────────┼──────────────────┼──────────────────┼───────────────┼───────────┤
│ persistent.ex │ exchange │ persistent.qu │ queue │ persistent.rk │ │
└───────────────┴─────────────┴──────────────────┴──────────────────┴───────────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬──────────┐
│ name │ messages │
├───────────────┼──────────┤
│ persistent.qu │ 1 │
├───────────────┼──────────┤
│ transient.qu │ 1 │
└───────────────┴──────────┘
[root@nullnull-os rabbitmq]#
这里存在两个队列,一个是持久化的队列, 一个是非持久化的消息队列。重启下RabbitMQ看下情况。
[root@nullnull-os rabbitmq]# rabbitmqctl stop
Stopping and halting node rabbit@nullnull-os ...
[root@nullnull-os rabbitmq]# ps -ef | grep rabbit
rabbitmq 3448 1 0 Aug18 ? 00:00:03 /usr/lib64/erlang/erts-11.0.2/bin/epmd -daemon
root 29042 28327 0 09:45 pts/0 00:00:00 grep --color=auto rabbit
[root@nullnull-os rabbitmq]# kill 3448
[root@nullnull-os rabbitmq]# systemctl start rabbitmq-server
[root@nullnull-os rabbitmq]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ persistent.ex │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌───────────────┬─────────────┬──────────────────┬──────────────────┬───────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├───────────────┼─────────────┼──────────────────┼──────────────────┼───────────────┼───────────┤
│ │ exchange │ persistent.qu │ queue │ persistent.qu │ │
├───────────────┼─────────────┼──────────────────┼──────────────────┼───────────────┼───────────┤
│ persistent.ex │ exchange │ persistent.qu │ queue │ persistent.rk │ │
└───────────────┴─────────────┴──────────────────┴──────────────────┴───────────────┴───────────┘
[root@nullnull-os rabbitmq]# rabbitmqctl list_queues --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌───────────────┬──────────┐
│ name │ messages │
├───────────────┼──────────┤
│ persistent.qu │ 1 │
└───────────────┴──────────┘
[root@nullnull-os rabbitmq]#
在重启之后,非持久化的消息队列已经没有了,而定义为持久化的消息交换器、队列和消息都还是存在的。那再来看看此时消息存储在磁盘是上一个什么样子的结构。
[root@nullnull-os 628WB79CIFDYO9LJI6DKMI09L]# pwd
/var/lib/rabbitmq/mnesia/rabbit@nullnull-os/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L
[root@nullnull-os 628WB79CIFDYO9LJI6DKMI09L]# tree .
.
|-- msg_store_persistent (保存着持久化的消息数据)
| `-- 0.rdq
|-- msg_store_transient (保存着非持久化相关的数据)
| `-- 0.rdq
|-- queues (保存诂rabbit_queue_index相关数据,即队列索引)
| `-- 1QKEY2HOMIM54YMCVC16QQ8U5
| |-- 0.idx (索引文件)
| `-- journal.jif
`-- recovery.dets
4 directories, 5 files
[root@nullnull-os 628WB79CIFDYO9LJI6DKMI09L]# ll -R .
.:
total 20
drwxr-x--- 2 rabbitmq rabbitmq 4096 Aug 22 10:18 msg_store_persistent
drwxr-x--- 2 rabbitmq rabbitmq 4096 Aug 22 10:18 msg_store_transient
drwxr-x--- 3 rabbitmq rabbitmq 4096 Aug 22 10:18 queues
-rw-r----- 1 rabbitmq rabbitmq 5464 Aug 22 10:18 recovery.dets
./msg_store_persistent:
total 0
-rw-r----- 1 rabbitmq rabbitmq 0 Aug 22 10:18 0.rdq
./msg_store_transient:
total 0
-rw-r----- 1 rabbitmq rabbitmq 0 Aug 22 10:18 0.rdq
./queues:
total 4
drwxr-x--- 2 rabbitmq rabbitmq 4096 Aug 22 10:18 1QKEY2HOMIM54YMCVC16QQ8U5
./queues/1QKEY2HOMIM54YMCVC16QQ8U5:
total 4
-rw-r----- 1 rabbitmq rabbitmq 244 Aug 22 10:18 0.idx
-rw-r----- 1 rabbitmq rabbitmq 0 Aug 22 10:18 journal.jif
[root@nullnull-os 628WB79CIFDYO9LJI6DKMI09L]#
RabbitMQ通过配制queue_index_embed_msgs_below可以根据消息大小决定存储位置,默认queue_index_embed_msgs_below是4096字节(包含消息体、属性及headers),小于该值的消息都存在rabbit_queue_index中。
vi 0.idx
à^@êù′jμp<9d>!oBàbdO_ê^@^@^@^@^@^@^@^@^@^@^@^L^@^@^@ò<83>h^Fd^@^Mbasic_messageh^Dd^@^Hresourcem^@^@^@^A/d^@^Hexchangem^@^@^@^Mpersistent.exl^@^@^@^Am^@^@^@^Mpersistent.rkjh^Fd^@^Gcontenta<d^@^Dnonem^@^@^@^N<90>^@
text/plain^Bd^@^Yrabbit_framing_amqp_0_9_1l^@^@^@^Am^@^@^@^Lhello world:jm^@^@^@^Pêù′jμp<9d>!oBàbdO_êd^@^Dtrue
在文件内容中,还有检查到发送的数据内容: hello world
到此,发送端确认机制验证完成。