首先,在使用这个插件之前,我们先来了解一下什么叫队头阻塞?
我们如果同时发两次带有过期时间的消息时,使用TTL就会有队头阻塞问题,比如说,第一个发送的消息过期以后(10s),第二个消息不会被打印出来(5s)。
一、首先,安装插件
? ? ? ? 1.首先将插件保存到对应目录的数据券
docker volume inspect mq-plugins
? ? ? ? 2.将插件上传到这个目录?
? ? ? ? 3.执行下面命令,安装插件
docker exec -it mq bash
? ? ? ? 4.执行下面命令,开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
二、DelayExchange工作原理
? ? ? ? 1.首先,安装完插件后,我们在控制台可以看到这个地方多了一个选项????????
? ? ? ? ?2.这个交换机,不仅做路由转发,还做消息存储(这就是延迟交换机和其他交换机最本质的区别)
? ? ? ? 3.声明交换机和队列
@Slf4j
@Configuration
public class DelayConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.exchange")
// 延迟交换机
.delayed()
.build();
}
@Bean
public Queue delayQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay");
}
}
? ? ? ? 4.声明一个消费者
@RabbitListener(queues = "delay.queue")
public void onDelayMessage(String msg){
log.info("收到delay.queue中的消息:{}", msg);
}
? ? ? ? 5.消息发送进行测试
@Test
public void testSendDelayMsg(){
String exchange = "delay.exchange";
String routingKey = "delay";
Message message1 = MessageBuilder
.withBody("delay11111111".getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay",10000)
.build();
rabbitTemplate.convertAndSend(exchange,routingKey,message1);
Message message2 = MessageBuilder
.withBody("delay222222222".getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay",20000)
.build();
rabbitTemplate.convertAndSend(exchange,routingKey,message2);
}