RabbitMQ
是流行的开源消息队列中间件,使用 erlang 语言开发,由于其社区活跃度高,维护更新较快,深得很多企业的喜爱。
死信队列
(Dead Letter Queue,简称 DLX)是 RabbitMQ 中一种特殊的队列,用于处理无法正常被消费者消费的消息。当消息在原始队列中因为 达到最大重试次数、过期、或者 满足特定条件 时,可以 将这些消息重新路由到一个预定义的死信队列中 进行进一步处理或记录。
当发生以下情况,业务队列中的消息会进入死信队列:
消息被否定确认
:使用 channel.basicNack
或 channel.basicReject
,并且此时 requeue
属性被设置为 false
。消息过期
:消息在队列的存活时间超过设置的 TTL 时间。消息溢出
:队列中的消息数量已经超过最大队列长度。当发生以上三种情况后,该消息将成为 死信
。死信消息会被 RabbitMQ 进行特殊处理:
那么 死信
被丢到死信队列后,会发生什么变化呢?
x-dead-letter-routing-key
的话,“死信” 的路由键会被替换成该参数对应的值。举个例子:
原有队列的路由键是 RoutingKey1
,有以下两种情况:
x-dead-letter-routing-key
参数值为 RoutingKey2
,则该消息成为 “死信” 后,会将路由键更改为 RoutingKey2
,从而进入死信交换机中的死信队列。x-dead-letter-routing-key
参数,则该消息成为 “死信” 后,路由键不会更改,也不会进入死信队列。当配置了 x-dead-letter-routing-key
参数后,消息成为 “死信” 后,会在消息的 Header
中添加很多奇奇怪怪的字段,我们可以在死信队列的消费端通过以下方式进行打印:
log.info("死信消息properties: {}", message.getMessageProperties());
日志内容如下:
2024-01-07 21:16:19.745 INFO 11776 --- [ntContainer#3-1] c.d.receiver.DeadLetterMessageReceiver :消息properties: MessageProperties [headers={x-first-death-exchange=demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=demo.simple.business.exchange, time=Sun Jan 07 21:16:19 CST 2024, routing-keys=[], queue=demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=demo.simple.business.queuea}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=demo.simple.deadletter.exchange, receivedRoutingKey=demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-RPfmKjM8Lau9X7Fl0CtbEA, consumerQueue=demo.simple.deadletter.queuea]
格式化后:
Header 中看起来有很多信息,实际上并不多,只是值比较长而已。下面就简单说明一下 Header 中的值:
字段名 | 含义 |
---|---|
x-first-death-exchange | 第一次成为死信时的交换机名称。 |
x-first-death-reason | 第一次成为死信的原因:rejected :消息在进入队列时被队列拒绝。expired :消息过期。maxlen :队列内消息数量超过队列最大容量。 |
x-first-death-queue | 第一次成为死信时的队列名称。 |
x-death | 历史被投入死信交换机的信息列表,同一个消息每进入一次死信交换机,这个数组的信息就会被更新。 |
通过上面的信息,我们已经知道如何使用死信队列了,那么死信队列一般在什么场景下使用呢?
死信队列 一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要是消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等。当发生异常时,当然 不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息 (没错,以前很多人这么干的 = =)。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好得多。
死信消息的生命周期如下:
Nack
或 Reject
操作;Nack
或 Reject
的消息由 RabbitMQ 投递到死信交换机中;死信队列的配置可以分为以下三步:
注意:
并不是直接声明一个公共的死信队列,然后所有死信消息就会自己进入死信队列中了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后每个业务队列分配一个单独的路由键。
有了死信交换机和路由键后,接下来就像配置业务队列一样,配置死信队列,并绑定在死信交换机上。看到这里,大家应该可以明白:
所以死信交换机可以为任何类型【Direct、Fanout、Topic】。一般来说,因为开发过程中会为每个业务队列分配一个独有的路由 key,并对应的配置一个死信队列进行监听。
有了前面的这些描述后,我们接下来实战操作一下。
配置类中声明了两个交换机:
业务交换机(广播)
,绑定了两个业务队列:
死信交换机(直连)
,绑定了两个死信队列,并配置了相应的路由键:
RabbitMQConfig.java
package com.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* <p> @Title RabbitMQOrderConfig
* <p> @Description RabbitMQ配置
*
* @author ACGkaka
* @date 2023/12/22 14:05
*/
@Configuration
public class RabbitMQConfig {
/** 业务队列 */
public static final String BUSINESS_EXCHANGE_NAME = "demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "demo.simple.business.queueb";
/** 死信队列 */
public static final String DEAD_LETTER_EXCHANGE = "demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "demo.simple.deadletter.queueb";
// 声明业务交换机(广播)
@Bean
public FanoutExchange businessExchange() {
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信交换机(直连)
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean
public Queue businessQueueA() {
Map<String, Object> args = new HashMap<>(2);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 声明当前队列绑定的死信路由键
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明业务队列B
@Bean
public Queue businessQueueB() {
Map<String, Object> args = new HashMap<>(2);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 声明当前队列绑定的死信路由键
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明死信队列B
@Bean
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(Queue businessQueueA, FanoutExchange businessExchange) {
return BindingBuilder.bind(businessQueueA).to(businessExchange);
}
// 声明业务队列B绑定关系
@Bean
public Binding businessBindingB(Queue businessQueueB, FanoutExchange businessExchange) {
return BindingBuilder.bind(businessQueueB).to(businessExchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(Queue deadLetterQueueA, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueueA).to(deadLetterExchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 声明死信队列B绑定关系
@Bean
public Binding deadLetterBindingB(Queue deadLetterQueueB, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueueB).to(deadLetterExchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
application.yml
server:
port: 8081
spring:
application:
name: springboot-rabbitmq-dead-letter
rabbitmq:
# 此处不建议单独配置host和port,单独配置不支持连接RabbitMQ集群
addresses: 127.0.0.1:5672
username: guest
password: guest
# 虚拟host 可以不设置,使用server默认host
virtual-host: /
# 是否开启发送端消息抵达队列的确认
publisher-returns: true
# 发送方确认机制,默认为NONE,即不进行确认;SIMPLE:同步等待消息确认;CORRELATED:异步确认
publisher-confirm-type: correlated
# 消费者监听相关配置
listener:
simple:
acknowledge-mode: manual # 确认模式,默认auto,自动确认;manual:手动确认
default-requeue-rejected: false # 消费端抛出异常后消息是否返回队列,默认值为true
prefetch: 1 # 限制每次发送一条数据
concurrency: 1 # 同一个队列启动几个消费者
max-concurrency: 1 # 启动消费者最大数量
# 重试机制
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 3
# 重试间隔时间(毫秒)
initial-interval: 3000
为了方便测试,写一个简单的消息生产者,通过controller层来生产消息。
SendMessageController.java
import com.demo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <p> @Title SendMessageController
* <p> @Description 推送消息接口
*
* @author ACGkaka
* @date 2023/1/12 15:23
*/
@Slf4j
@RestController
public class SendMessageController {
/**
* 使用 RabbitTemplate,这提供了接收/发送等方法。
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage")
public String sendMessage(String message) {
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", message);
return "OK";
}
}
接下来是业务队列的消费端代码
BusinessMessageReceiver.java
import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* <p> @Title BusinessMessageReceiver
* <p> @Description RabbitMQ业务队列消费端
*
* @author ACGkaka
* @date 2024/1/7 17:43
*/
@Slf4j
@Component
public class BusinessMessageReceiver {
@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME)
public void receiveA(String body, Message message, Channel channel) throws IOException {
log.info("业务队列A收到消息: {}", body);
try {
if (body.contains("deadletter")) {
throw new RuntimeException("dead letter exception");
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("业务队列A消息消费发生异常,error msg: {}", e.getMessage(), e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEB_NAME)
public void receiveB(String body, Message message, Channel channel) throws IOException {
log.info("业务队列B收到消息: {}", body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
接下来是死信队列的消费端代码
DeadLetterMessageReceiver.java
import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* <p> @Title DeadLetterMessageReceiver
* <p> @Description RabbitMQ死信队列消费端
*
* @author ACGkaka
* @date 2024/1/7 18:14
*/
@Slf4j
@Component
public class DeadLetterMessageReceiver {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
public void receiveA(String body, Message message, Channel channel) throws IOException {
log.info("死信队列A收到消息: {}", body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)
public void receiveB(String body, Message message, Channel channel) throws IOException {
log.info("死信队列B收到消息: {}", body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
消费正常消息,请求结果:
请求地址:http://localhost:8081/sendMessage?message=Hello
从日志可以看到:两个业务队列成功消费。
消费错误消息,请求结果:
请求地址:http://localhost:8081/sendMessage?message=deadletter
从日志可以看到:业务队列A和B都收到了消息,但是 业务队列A消费发生异常,然后消息就被 转到了死信队列,死信队列消费端成功消费。
死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。
死信消息
时 RabbitMQ 为我们做的一层保障,其实我们 也可以不使用死信队列,而是 在消息消费异常的时候,将消息主动投递到另一个交换机中,当你明白了这些之后,这些 Exchange 和 Queue 想怎样配合就可以怎样配合。比如:
整理完毕,完结撒花~ 🌻
参考地址:
1.【RabbitMQ】一文带你搞定RabbitMQ死信队列,https://cloud.tencent.com/developer/article/1463065