如何使用RebbitMQ延迟交换机插件解决死信交换机队头阻塞问题

发布时间:2023年12月28日

首先,在使用这个插件之前,我们先来了解一下什么叫队头阻塞?

我们如果同时发两次带有过期时间的消息时,使用TTL就会有队头阻塞问题,比如说,第一个发送的消息过期以后(10s),第二个消息不会被打印出来(5s)。

一、首先,安装插件

? ? ? ? 1.首先将插件保存到对应目录的数据券

docker volume inspect mq-plugins

? ? ? ? 2.将插件上传到这个目录?

? ? ? ? 3.执行下面命令,安装插件

docker exec -it mq bash

? ? ? ? 4.执行下面命令,开启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

二、DelayExchange工作原理

? ? ? ? 1.首先,安装完插件后,我们在控制台可以看到这个地方多了一个选项????????

? ? ? ? ?2.这个交换机,不仅做路由转发,还做消息存储(这就是延迟交换机和其他交换机最本质的区别)

? ? ? ? 3.声明交换机和队列

@Slf4j
@Configuration
public class DelayConfig {
    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.exchange")
                // 延迟交换机
                .delayed()
                .build();
    }

    @Bean
    public Queue delayQueue(){
        return new Queue("delay.queue");
    }

    @Bean
    public Binding delayBinding(){
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay");
    }
}

? ? ? ? 4.声明一个消费者

@RabbitListener(queues = "delay.queue")
public void onDelayMessage(String msg){
    log.info("收到delay.queue中的消息:{}", msg);
}

? ? ? ? 5.消息发送进行测试

@Test
public void testSendDelayMsg(){
    String exchange = "delay.exchange";
    String routingKey = "delay";
    Message message1 = MessageBuilder
            .withBody("delay11111111".getBytes())
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setHeader("x-delay",10000)
            .build();
    rabbitTemplate.convertAndSend(exchange,routingKey,message1);
    
    Message message2 = MessageBuilder
            .withBody("delay222222222".getBytes())
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setHeader("x-delay",20000)
            .build();
    rabbitTemplate.convertAndSend(exchange,routingKey,message2);
}

文章来源:https://blog.csdn.net/m0_63278070/article/details/135254291
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。