Rabbitmq 之过期消息、死信队列、延迟队列

发布时间:2024年01月10日
1、过期时间设置方式

Rabbitmq 中消息过期有两种设置方式:

1)、单条消息设置过期时间;

2)、队列属性设置消息过期时间。

1.1、设置单条消息过期时间

????????????????在发送消息的时候指定消息的属性。设置消息的过期时间。

@Autowired
private RabbitTemplate template;
public void sendMsg(){
    // 自定义消息属性
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setExpiration("1500");// 单位是ms,消息到过期时间之后自动删除
    Message message = MessageBuilder.withBody("hello long".getBytes()).andProperties(messageProperties).build();
    template.convertAndSend("exchange.direct", "info", message);
}
1.2、设置队列的消息过期时间

????????队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存在多久。

// 设置队列属性,添加过期时间
@Bean
public Queue queue(){
    Map<String ,Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 10000);
    return  QueueBuilder.durable(queueName).withArguments(arguments).build();
}

注:如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。

2、死信队列?
2.1、死信队列的实现

????????DLX: Dead-Letter-Exchange,如果队列设置过期时间或者队列中的消息设置了过期时间,并且队列设置了死信交换机死信交换机路由key,这样过期的消息就会通过死信交换机发送到死信队列。 ?

?两条线路按照上一篇博客进行配置,队列增加死信队列的配置如下:

private static String EXCHANGE_DLX = "dlx-exchange";
private static String BINDING_DLX_KEY = "dlx-route-key";
@Bean
public Queue queue(){
    Map<String, Object> arguments =new HashMap<>();
    // 指定死信交换机,通过x-dead-letter-exchange 来设置
    arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
    // 设置死信路由key,value 为死信交换机和死信队列绑定的key,要一模一样,因为死信交换机是直连交换机
    arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
    // 队列的过期时间
    arguments.put("x-message-ttl",10000);
    return  QueueBuilder.durable(queueName).withArguments(arguments).build();
}

进入死信队列条件:

1)、队列消息过期后,会发送到死信队列中;

2)、队列达到最大长度(先入队的消息会被发送到DLX);

3)、从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列;

4)、开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列。

2.2、消息手动确认

application.yml 启动手动确认

????????我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

消费者未确认消息不进行重新投递,进入死信队列?代码如下:

@RabbitListener(queues = {"quque.dlx.a"})
public void process(Message message, Channel channel) {
    System.out.println("接收到的消息:" + message);
// 对消息不确认, ack单词是确认的意思;
// void basicNack(long deliveryTag, boolean multiple, boolean requeue)
// deliveryTag:消息的一个数字标签
// multiple:如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
// requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列
    try {
        System.out.println("deliveryTag = " +message.getMessageProperties().getDeliveryTag());
        //要开启rabbitmq消息消费的手动确认模式,然后才能这么写代码;
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

消费者拒绝消息不重新投递,进入死信队列。

@RabbitListener(queues = {RabbitConfig.QUEUE})
public void process(Message message, Channel channel) {
    System.out.println("接收到的消息:" + message);
    try {
        System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
        //要开启rabbitmq消息拒绝模式,然后才能这么写代码;
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
3、延迟队列

????????RabbitMQ本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费者可以从这个队列取走消息。

3.1、Rabbitmq 延迟插件下载安装

????????选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,下载地址:Community Plugins — RabbitMQ

# 1、把插件拷贝到 RabbitMQ 的 plugins目录下
# 2、解压
unzip rabbitmq_delayed_message_exchange-3.12.0.ez
# 3、启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

????????消息发送后不会直接投递到队列,而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;Mnesia 是一个小型数据库,不适合于大量延迟消息的实现解决了消息过期时间不一致出现的问题。

延迟队列实现代码

组件绑定:

@Component
@Slf4j
public class RabbitConfig {
    public static final String EXCHANGE = "exchange:plugin";
    public static final String QUEUE = "queue.plugin";
    public static final String KEY = "plugin";  
    // 使用延迟队列插件,需要使用自定义交换机的类,不能使用建造者模式
    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
// CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        return new CustomExchange(EXCHANGE, "x-delayed-message", true, false, arguments);
    }
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(QUEUE).build();
    }
    @Bean
    public Binding binding(CustomExchange customExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(customExchange).with(KEY).noargs();
    }
}

生产者:

MessageProperties messageProperties=new MessageProperties();
messageProperties.setHeader("x-delay",16000);  // 消息头部得设置延迟属性,才能识别是延迟消息。
String msg = "hello world";
Message message=new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "plugin", message);
log.info("发送完毕,发送时间为:{}",new Date());
4、总结

? ? ? ?本文介绍 Rabbitmq 消息过期设置,以及消息过期后如何处理;详细介绍死信交换机、死信队列、延迟队列的插件安装和使用,帮助大家进一步熟悉对 Rabbitmq 的使用。

????????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

文章来源:https://blog.csdn.net/zwl2220943286/article/details/135492657
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。