提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
RabbitMQ扩展
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
提示:这里可以添加本文要记录的大概内容:
在当今的分布式系统和微服务架构中,消息队列扮演着至关重要的角色。而 RabbitMQ 作为一款强大而灵活的消息队列中间件,以其高级特性、死信队列和延迟队列等功能,成为了许多企业和开发人员的首选。
在这篇博客中,我们将深入探讨 RabbitMQ 的一些高级特性,如消息优先级、持久性、队列和交换器的绑定、消息确认等。这些特性使得 RabbitMQ 在处理高并发、高可靠性的应用场景时表现卓越。
另外,我们还将详细介绍 RabbitMQ 的死信队列和延迟队列。死信队列用于处理无法被正常消费的消息,确保消息不会丢失,而延迟队列则允许我们在指定的未来时间或满足特定条件时再处理消息,这对于定时任务、异步处理等场景非常有用。
通过深入了解和利用 RabbitMQ 的高级特性、死信队列和延迟队列,我们将能够构建更可靠、高效、灵活的应用系统。无论是处理高并发请求、确保消息的可靠传输,还是实现复杂的异步工作流,RabbitMQ 都为我们提供了强大的支持。
希望这篇博客能够为你提供有价值的信息,帮助你更好地理解和应用 RabbitMQ 的强大功能。让我们一起探索 RabbitMQ 的世界,释放其潜能,构建更出色的应用系统!
提示:以下是本篇文章正文内容,下面案例可供参考
我们之前说过RabbitMQ可以进行削峰填谷,就是通过消费端限流的方式限制消费者的拉取速度,达到保护消费端的目的。
1.消费端配置限流机制
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: zhangsan
password: zhangsan
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
prefetch: 5
2.消费者监听队列
@Component
public class QosConsumer{
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
// 1.获取消息
System.out.println(new String(message.getBody()));
// 2.模拟业务处理
Thread.sleep(3000);
// 3.签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
1.消费端配置不公平分发
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: zhangsan
password: zhangsan
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
prefetch: 1
2.编写两个消费者
@Component
public class UnfairConsumer {
// 消费者1
@RabbitListener(queues = "my_queue")
public void listenMessage1(Message message, Channel channel) throws Exception {
//1.获取消息
System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
//2. 处理业务逻辑
Thread.sleep(500); // 消费者1处理快
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
// 消费者2
@RabbitListener(queues = "my_queue")
public void listenMessage2(Message message, Channel channel) throws Exception {
//1.获取消息
System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
//2. 处理业务逻辑
Thread.sleep(3000);// 消费者2处理慢
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
1.在创建队列时设置其存活时间:
@Configuration
public class RabbitConfig2 {
private final String EXCHANGE_NAME="my_topic_exchange2";
private final String QUEUE_NAME="my_queue2";
// 1.创建交换机
@Bean("bootExchange2")
public Exchange getExchange2(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.创建队列
@Bean("bootQueue2")
public Queue getMessageQueue2(){
return QueueBuilder
.durable(QUEUE_NAME)
.ttl(10000) //队列的每条消息存活10s
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange, @Qualifier("bootQueue2") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
@Test
public void testSendMessage() {
//设置消息属性
MessageProperties messageProperties = new MessageProperties();
//设置存活时间
messageProperties.setExpiration("10000");
// 创建消息对象
Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}
注意:
优先级队列是在RabbitMQ 3.5.0之后的版本才支持的。具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。队列的优先级可以通过x-max-priority参数设置。
建立一个priority-exchange交换机,类型为direct。建立一个priority-queue队列,并与priority-exchange绑定。设置x-max-priority参数的值为100,表示最大优先级为100。注意:x-max-priority参数的值应该介于1到255。建议使用1到10之间的队列。如果设置的优先级更大,将使用更多的Erlang进程,消耗更多的CPU资源,并且运行时调度也会受到影响。
优先级队列适用于需要处理高优先级消息的场景,例如订单处理、实时监控等。
1.创建队列和交换机
@Configuration
public class RabbitConfig3 {
private final String EXCHANGE_NAME="priority_exchange";
private final String QUEUE_NAME="priority_queue";
// 1.创建交换机
@Bean(EXCHANGE_NAME)
public Exchange priorityExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.创建队列
@Bean(QUEUE_NAME)
public Queue priorityQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
.maxPriority(10)
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
2.编写生产者
@Test
public void testPriority() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// i为5时消息的优先级较高
MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(9);
Message message = new Message(("send message..." + i).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
} else {
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
}
}
}
3.编写消费者
@Component
public class PriorityConsumer {
@RabbitListener(queues = "priority_queue")
public void listenMessage(Message message, Channel channel) throws Exception {
//获取消息
System.out.println(new String(message.getBody()));
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
死信队列( Dead Letter Queue)是RabbitMQ 中的一种特殊队列,用于处理无法被正常消费的消息。当消费者在处理消息时出现异常或崩溃,RabbitMQ 服务器会将该消息发送到死信交换机中,然后再由死信交换机发给死信队列,而不是直接删除它。
通过配置死信队列,可以确保消息不会丢失,并为后续的处理提供了一种机制。死信队列通常用于处理那些无法处理的异常消息,例如错误的格式、无法解析的数据等。
在使用死信队列时,需要在消费者端配置一个回调函数,用于处理从死信队列中接收到的消息。这个回调函数可以在消费者重新启动后被调用,或者通过其他方式进行处理。
1.创建死信队列
@Configuration
public class RabbitConfig4 {
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";
private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";
// 死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)
.durable(true)
.build();
}
// 死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
// 普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)
.durable(true)
.build();
}
// 普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
.ttl(10000) // 消息存活10s
.maxLength(10) // 队列最大长度为10
.build();
}
// 普通交换机绑定普通队列
@Bean
public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
2.生产者发送消息
@Test
public void testDlx(){
// 存活时间过期后变成死信
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// 超过队列长度后变成死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// }
// 消息拒签但不返回原队列后变成死信
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
3.消费者拒收消息
@Component
public class DlxConsumer {
@RabbitListener(queues = "normal_queue")
public void listenMessage(Message message, Channel channel) throws IOException {
// 拒签消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。通过使用延迟队列,可以将消息暂时存储在队列中,并设置一个延迟时间或条件。当延迟时间到期或条件满足时,消息才会被传递给消费者进行处理。这样可以实现一些定时任务、定时通知、异步处理等需求。
1.创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2.编写配置文件
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: zhangsan
password: zhangsan
virtual-host: /
# 日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
3.创建队列和交换机
@Configuration
public class RabbitConfig {
// 订单交换机和队列
private final String ORDER_EXCHANGE = "order_exchange";
private final String ORDER_QUEUE = "order_queue";
// 过期订单交换机和队列
private final String EXPIRE_EXCHANGE = "expire_exchange";
private final String EXPIRE_QUEUE = "expire_queue";
// 过期订单交换机
@Bean(EXPIRE_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(EXPIRE_EXCHANGE)
.durable(true)
.build();
}
// 过期订单队列
@Bean(EXPIRE_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(EXPIRE_QUEUE)
.build();
}
// 将过期订单队列绑定到交换机
@Bean
public Binding bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,@Qualifier(EXPIRE_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("expire_routing")
.noargs();
}
// 订单交换机
@Bean(ORDER_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(ORDER_EXCHANGE)
.durable(true)
.build();
}
// 订单队列
@Bean(ORDER_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(ORDER_QUEUE)
.ttl(10000) // 存活时间为10s,模拟30min
.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("expire_routing") // 死信交换机的路由关键字
.build();
}
// 将订单队列绑定到交换机
@Bean
public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("order_routing")
.noargs();
}
}
4.编写下单的控制器方法,下单后向订单交换机发送消息
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
//下单
@GetMapping("/place/{orderId}")
public String placeOrder(@PathVariable String orderId){
System.out.println("处理订单数据...");
// 将订单id发送到订单队列
rabbitTemplate.convertAndSend("order_exchange", "order_routing", orderId);
return "下单成功,修改库存";
}
}
5.编写监听死信队列的消费者
// 过期订单消费者
@Component
public class ExpireOrderConsumer {
// 监听队列
@RabbitListener(queues = "expire_queue")
public void listenMessage(String orderId){
System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
}
}
在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
1.在window中下载RabbitMQ Delayed Message Plugin插件(大家自己去网上找),使用rz将插件上传至虚拟机。
2.安装插件
# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.重启RabbitMQ服务
#停止rabbitmq
rabbitmqctl stop
#启动rabbitmq
rabbitmq-server restart -detached
1.创建延迟交换机和延迟队列
@Configuration
public class RabbitConfig2 {
public final String DELAYED_EXCHANGE = "delayed_exchange";
public final String DELAYED_QUEUE = "delayed_queue";
//1.延迟交换机
@Bean(DELAYED_EXCHANGE)
public Exchange delayedExchange() {
// 创建自定义交换机
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic"); // topic类型的延迟交换机
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
//2.延迟队列
@Bean(DELAYED_QUEUE)
public Queue delayedQueue() {
return QueueBuilder
.durable(DELAYED_QUEUE)
.build();
}
// 3.绑定
@Bean
public Binding bindingDelayedQueue(@Qualifier(DELAYED_QUEUE) Queue queue, @Qualifier(DELAYED_EXCHANGE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order_routing").noargs();
}
}
2.编写下单的控制器方法
@GetMapping("/place2/{orderId}")
public String placeOrder2(@PathVariable String orderId) {
System.out.println("处理订单数据...");
// 设置消息延迟时间为10秒
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
};
// 将订单id发送到订单队列
rabbitTemplate.convertAndSend("delayed_exchange", "order_routing", orderId, messagePostProcessor);
return "下单成功,修改库存";
}
3.编写延迟队列的消费者
@RabbitListener(queues = "delayed_queue")
public void listenMessage(String orderId){
System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
}
提示:这里对文章进行总结:
通过这篇博客,读者可以更深入地了解 RabbitMQ 的强大功能,并能够在实际应用中更好地利用这些特性来构建高效、可靠的消息传递系统。如果你还有其他问题或需要进一步的信息,请随时提问。