死信队列解释:
RabbitMQ的死信队列(DEAD Letter Queue,简称DLQ),是一种用于消息处理失败或者无法路由的机制。它允许将无法正常消费的消息路由到另一个队列,以便于后续处理、排查。
出现死信队列的情况:
1、消息处理失败:消息没有正常被消费,消费代码出现异常无法正常处理一条消息时,该条消息可以标记为死信。
2、消息过期:RabbitMQ中消息可以设置过期时间,如果在规定时间内没有被消费,它可以被认为是死信并被发送到死信队列。
3、消息被拒绝:当消费者明确拒绝一条消息时,它可以被标记为死信并发送到死信队列。
4、消息无法路由:当消息不能被路由到任何队列时,例:没有绑定关系或者是路由键,消息可以被发送到死信队列。
当消息变成“死信”之后,如果配置了死信队列,那么它将会被发送到死信交换机,死信交换机将死信发送到一个队列上,这个队列就是死信队列。如果没有配置死信队列,那么这个消息将会被丢弃。
配置死信队列步骤:
1、创建死信队列:用来存储死信消息的队列
2、创建死信交换机:定义一个死信交换机
3、死信队列和死信交换机进行绑定:将死信队列和死信交换机进行绑定,以便消息发送到死信队列上
4、在主队列上配置死信属性:通过设置"x-dead-letter-exchange","x-dead-letter-routing-key"属性指定死信消息应该被发送到那个交换机和路由键
代码如下:
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;
@Configuration
public class RabbitmqConfig {
//配置死信队列和死信交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("dead-letter-change");
}
//配置死信队列
@Bean
public Queue deadLetterQueue(){
return new Queue("dead-letter-queue");
}
//死信队列绑定死信交换机
@Bean
public Binding deadLetterBinding(){
return BindingBuilder.bind(deadLetterQueue()).to(directExchange()).with("dead-letter-routing-key");
}
//主队列的交换机
@Bean
public DirectExchange mainExchange(){
return new DirectExchange("main-exchange");
}
//主队列
@Bean
public Queue mainQueue(){
Map<String,Object> args = new HashMap<>();
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange","dead-letter-change");
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key","dead-letter-routing-key");
return QueueBuilder.durable("main-queue").withArguments(args).build();
}
//绑定主队列到主交换机
@Bean
public Binding binding(){
return BindingBuilder.bind(mainQueue()).to(mainExchange()).with("main-routing-key");
}
}
消费者分别监听主队列和死信队列(代码如下):
//消费者监听死信队列
@Component
public class DeadLetterMsgReceiver {
@RabbitListener(queues = "dead-letter-queue")
public void receiverA(Message message, Channel channel) throws IOException {
System.out.println("死信消息:"+new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
//消费者监听主队列
@Component
public class MainMsgReceiver {
@RabbitListener(queues = "main-queue")
public void receiverA(Message message, Channel channel) throws IOException {
System.out.println("普通消息:"+new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}