🎉🎉欢迎来到我的CSDN主页!🎉🎉
🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚
🌟推荐给大家我的专栏《RabbitMQ实战》。🎯🎯
👉点击这里,就可以查看我的主页啦!👇👇
🎁如果感觉还不错的话请给我点赞吧!🎁🎁
💖期待你的加入,一起学习,一起进步!💖💖
目录
在 RabbitMQ 消息队列系统中,交换机(Exchange)是用于接收生产者发送的消息并将其路由到相应的队列的组件。它是消息路由的核心组件之一。使用交换机将生产者的消息分发到队列中可以处理更为复杂的代码或者说可以更精准的发送到队列中。
有了交换机我们的消息不是直接发给队列的而是发送给交换机再通过特殊的条件找到符合条件的队列再由队列发送给消费者,但这中间还有些概念需要我们了解一下分别是路由键和绑定键
- 路由键(RoutingKey)
生产者将消息发送给交换机的时候会指定RoutingKey指定路由规则
- 绑定键(BindingKey)
通过绑定键将交换机与队列关联起来,这样RabbitMQ就知道如何正确将消息路由到队列
总结:生产者将消息发给那个Exchanges是路由键决定的,而Exchanges与那个队列绑定是通过邦定键决定的。
刚刚提到使用交换机将消息发送到队列中比直接发送到队列中好,那么具体好在那里我这里举个例
路由控制:通过交换机,可以根据消息的路由键将消息路由到与之匹配的队列。这样,可以根据消息的属性或标签来定向分发消息,实现精确的消息路由控制。
消息过滤:交换机可以根据消息的路由键、消息头部属性等信息对消息进行过滤和筛选,将符合特定条件的消息发送到相应的队列。这样可以实现消息的订阅和过滤机制,灵活地处理不同类型的消息。
广播和多播:通过使用扇形交换机(Fanout Exchange),可以将消息广播到所有与之绑定的队列,实现消息的广播和多播机制,方便实现发布-订阅模式。
解耦和灵活性:通过将消息发送到交换机而不是直接发送到队列,生产者和消费者之间实现了解耦。生产者只需要将消息发送到指定的交换机,而不需要知道具体的队列。这样,可以灵活地增加、删除或修改队列,而不会对生产者产生影响。
可扩展性:使用交换机可以实现消息的分发和负载均衡机制。通过将消息发送到多个队列,可以实现横向扩展和并发处理,提高系统的吞吐量和性能。
如果你还不是很理解的话不妨看看这个案例:
假设我们正在构建一个电子商务网站,该网站有一个订单处理系统。在用户提交订单后,需要执行一系列异步任务,例如生成发货单、发送通知邮件、更新库存等。这些任务可能会消耗较长的时间,并且需要并行处理。
在这种情况下,我们可以使用消息队列系统来实现任务的异步处理。具体来说,我们可以创建一个交换机和多个队列,并将每个异步任务绑定到不同的队列上。以下是具体的实现步骤:
- 创建一个交换机,例如名为 "order_exchange" 的主题交换机。
- 创建多个队列,例如 "shipping_queue"、"notification_queue" 和 "inventory_queue"。
- 将 "shipping_queue" 绑定到交换机,并指定路由键为 "shipping"。
- 将 "notification_queue" 绑定到交换机,并指定路由键为 "notification"。
- 将 "inventory_queue" 绑定到交换机,并指定路由键为 "inventory"。
- 当用户提交订单时,订单处理系统将生成一条消息并发送到交换机。
- 根据任务类型,将消息的路由键设置为相应的值,例如 "shipping"、"notification" 或 "inventory"。
- 消息队列系统将根据消息的路由键将消息路由到相应的队列中。
- 每个队列对应一个消费者,负责处理特定的任务。例如,"shipping_queue" 的消费者可以从队列中获取消息,并生成发货单;"notification_queue" 的消费者可以发送通知邮件;"inventory_queue" 的消费者可以更新库存。
通过以上步骤,我们实现了任务的解耦和灵活性。生产者只需要将消息发送到交换机,而不需要关心具体的任务队列。每个任务都有独立的队列和消费者,可以并行处理,提高系统的吞吐量和性能。同时,我们可以根据需要增加或删除队列,扩展系统的处理能力
?
了解完交换机的概念,我们来认识一下交换机的类型
直连交换机是最简单的交换机类型,它将消息的路由键与绑定键进行精确匹配,当我们的路由键和绑定键一致的时候,将消息发送到与之完全匹配的队列。
像上图所描述的我们的路由键是orange就会进入对应的绑定键的orange队列中
?
?如果是两个相同的绑定键则都会进入,同时推送到Q1和Q2队列中
?注意:直连交换机只能根据绑定键进行路由,无法实现更复杂的路由逻辑。这意味着在需要进行高级路由或消息过滤的情况下,直连交换机可能无法满足需求。如果我们需要一个消息发送到多个队列中需要在交换机上绑定多个路由键,也是非常的麻烦
主题交换机基于模式匹配的方式将消息路由到队列。它使用通配符来进行匹配,支持通配符符号 "*" 和 "#"。其中 "*" 表示匹配一个单词,"#" 表示匹配一个或多个单词。
为了方便大家理解我写几个案例:
- ?RoutingKey:aa.orange.bb ====>Q1
- ?RoutingKey:aa.orange.rabbit====>Q1,Q2
- ?RoutingKey:aa.bb.rabbit====>Q2
- ?RoutingKey:lazy.aa====>Q2
- ?RoutingKey:lazy.aa.rabbit====>Q2
- ?RoutingKey:lazy.orange.aa====>Q1,Q2
- ?RoutingKey:lazy.orange.rabbit====>Q1,Q2
主题交换机就像是升级版的直连交换机一样,通过绑定键进行访问,但是不同的地方是主题交换机有两个特殊字符一个是*号另一个是#,通过这两个符合可以设定不同的条件,只有符合条件才会发送到对应的队列中。
知识拓展:
- 当一个队列的绑定键是#,它将会接收所有的消息,而不再考虑所接收消息的路由键
- 当一个队列的绑定键没有用到#和*时,它就像直连交换机一样工作
扇形交换机将消息广播到所有与之绑定的队列。无论消息的路由键是什么,扇形交换机都会将消息发送到所有绑定的队列中。这种类型的交换机常用于实现发布-订阅模式,将消息广播给多个消费者。
首部交换机和扇形交换机一样都不要路由键,首交换机根据消息Headers的属性进行匹配和路由。在消息发送时,可以指定一组键值对作为消息的头部属性,交换机会根据这些属性进行匹配。首部交换机提供了更灵活的匹配方式,但相对复杂度较高,通常使用较少。
注意:Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是仅匹配一个(any)还是需要全部匹配(all)
- all:在发布消息时携带的所有Entry必须和绑定在队列上的所有Entry完全匹配
- any:只要发布消息时携带的有一对键值对headers满足队列定义的多个参数arguments的其中一个就能匹配上,注意这里是键值对的完全匹配,只要匹配到键,值却是不一样的
默认交换机是一个预定义的无名交换机,它会自动将消息发送到与之路由键名称相同的队列中。当生产者没有显式地指定交换机时,消息会被发送到默认交换机中。
注意:默认交换机有一个特殊属性是默认交换机会自动将新建队列绑定到自己身上,并且绑定的路由键名称与队列名称一致。 也就是说当创建一个新的队列时,如果未显式地指定要绑定的交换机,那么该队列将自动绑定到默认交换机上。
死信交换机用于处理无法被消费者正确处理的消息。当消息在队列中变成死信(例如超过重试次数或队列已满),它将被发送到死信交换机,并根据死信交换机的绑定规则路由到指定的死信队列中进行进一步处理。
通常是以下三种情况:
- 消息被拒绝,并且设置 requeue 参数为 false
- 消息过期(默认情況下 Rabbit 中的消息不过期,但是可以设置队列的过期时间和消息的过期时间以达到消息过期的效果)
- 队列达到最大长度(一般当设置了最大队列长度或大小并达到最大值时)
满足以上的任意一种就会变成死信消息被我们的死信交换机接收到并发送给队列
案例讲解?
生产者生产一条1分钟后超时的订单消息到正常交换机exchange-a中,消息匹配到队列queue-a,但一分钟后仍未消费。 消息会被投递到死信交换机dlxy-exchange中,并发送到死信队列中, 死信队列dlx-queue的消费者拿到消息后,根据消息去查询订单的状态,如果仍然是未支付状态,将订单状态更新为超时状态。
?
编写代码之前先了解一下我们交换机的参数属性
?Name:交换机名称
Type:交换机类型,direct、 topic、fanout、 headers
Durability:是杏需要持久化,如果持久性,则RabbitMQ重启后,交换机还存在
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该ExchangeInternal:
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为FalseArguments:
Arguments:扩展参数,用于扩展AMQP协议定制化使用
?
🧑?🍳生产者
package org.example.produce.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
/**
* 定义队列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue("direct-queue");
}
/**
* 自定义直连交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct-exchange",true,false);
}
/**
* 将队列与交换机进行绑定,并设置路由键
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(directQueue())
.to(directExchange())
.with("direct_routing_key");
}
}
- directQueue() 方法:创建一个名为 "direct-queue" 的队列,并将其返回。
- directExchange() 方法:创建一个名为 "direct-exchange" 的直连交换机,并将其返回。这个交换机是持久化的(durable = true),不自动删除(autoDelete = false)。
- binding() 方法:将 directQueue() 方法返回的队列与 directExchange() 方法返回的交换机进行绑定,并设置路由键为 "direct_routing_key"。这样,当消息的路由键与 "direct_routing_key" 完全匹配时,消息将会被路由到这个队列中。
🧑?💼消费者
package org.example.produce.controller;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = {"direct-queue"})
public class DirectReceiver {
@RabbitHandler
public void handler(Map<String,Object> json){
System.out.println(json);
}
}
👇👇Controller
package org.example.produce.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/sendData")
public String sendData() {
Map<String,Object> data=new HashMap<>();
data.put("msg","hello 我是直连交换机");
rabbitTemplate.convertAndSend("direct-exchange","direct_routing_key", data);
return "😎";
}
}
🌷🌷效果展示:
?
?
🧑?🍳生产者
package org.example.produce.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
// 定义路由规则
public static String A_KEY = "*.orange.*" ;
public static String B_KEY = "*.* rabbit";
public static String C_KEY = "lazy.#";
/**
* 定义队列
* @return
*/
@Bean
public Queue Topicqueue(){
return new Queue("topic-queue",true);
}
/**
* 定义主题交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topic-exchange",true,false);
}
/**
* 将队列与交换机进行绑定,并设置路由键
* @return
*/
@Bean
public Binding bindingA(){
return BindingBuilder.bind(Topicqueue())
.to(topicExchange())
.with(A_KEY);
}
}
- 路由规则的定义:
A_KEY?定义了一个路由键为 ".orange." 的规则,其中 "*" 表示匹配任意一个单词,这个规则可以匹配以 "orange" 为中间单词的路由键。
B_KEY?定义了一个路由键为 ",?rabbit" 的规则,其中 "," 表示按照单词划分,它可以匹配两个单词的路由键,且第二个单词为 "rabbit"。
C_KEY?定义了一个路由键为 "lazy.#" 的规则,其中 "#" 表示匹配任意数量的单词,这个规则可以匹配以 "lazy" 开头并且后面可以是任意数量的单词的路由键。
- Topicqueue() 方法: 创建了一个名为 "topic-queue" 的队列,并将其返回。此队列是持久化的。
- topicExchange() 方法: 创建了一个名为 "topic-exchange" 的主题交换机,并将其返回。该交换机是持久化的,不自动删除。
- binding() 方法: 将 Topicqueue() 方法返回的队列与 topicExchange() 方法返回的主题交换机进行绑定,并使用路由键 A_KEY 进行绑定。这样,当消息的路由键与 A_KEY 匹配时,消息将会被路由到这个队列中。
🧑?💼消费者
package org.example.produce.controller;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = {"topic-queue"})
public class TopicReceiver {
@RabbitHandler
public void handler(Map<String,Object> json){
System.out.println(json);
}
}
👇👇Controller
package org.example.produce.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/sendTopic")
public String sendTopic() {
Map<String,Object> data=new HashMap<>();
data.put("msg","你好!! 我是主题交换机");
rabbitTemplate.convertAndSend("topic-exchange","aa.orange.bb", data);
return "😎";
}
}
🌷🌷效果展示:
?
?
🧑?🍳生产者
package org.example.produce.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
@Bean
public Queue queueX(){
return new Queue("queue-x");
}
@Bean
public Queue queueY(){
return new Queue("queue-y");
}
@Bean
public Queue queueZ(){
return new Queue("queue-z");
}
/**
* 定义扇形交换机,与路由键无关
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout-exchange",true,false);
}
@Bean
public Binding bindingX(){
return BindingBuilder.bind(queueX())
.to(fanoutExchange());
}
@Bean
public Binding bindingY(){
return BindingBuilder.bind(queueY())
.to(fanoutExchange());
}
@Bean
public Binding bindingZ(){
return BindingBuilder.bind(queueZ())
.to(fanoutExchange());
}
}
队列的定义:
queueX()?方法创建了一个名为 "queue-x" 的队列。
queueY()?方法创建了一个名为 "queue-y" 的队列。
queueZ()?方法创建了一个名为 "queue-z" 的队列。
- fanoutExchange() 方法: 创建了一个名为 "fanout-exchange" 的扇形交换机,并将其返回。该交换机是持久化的,不自动删除。扇形交换机会忽略消息的路由键,而是将消息广播到绑定到它上面的所有队列中。
- bindingX()、bindingY()、bindingZ() 方法: 分别将 queueX()、queueY()、queueZ() 返回的队列与 fanoutExchange() 返回的扇形交换机进行绑定。这样,当消息发送到扇形交换机时,它会被广播到所有绑定的队列中。
🧑?💼消费者
package org.example.produce.controller;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class FanoutReceiver {
@RabbitListener(queues = {"queue-x"})
@RabbitHandler
public void handlerY(Map<String,Object> json){
System.out.println("已接受到队列queue-x传递过来的消息:"+json);
}
@RabbitListener(queues = {"queue-y"})
@RabbitHandler
public void handlerX(Map<String,Object> json){
System.out.println("已接受到队列queue-y传递过来的消息:"+json);
}
@RabbitListener(queues = {"queue-z"})
@RabbitHandler
public void handlerZ(Map<String,Object> json){
System.out.println("已接受到队列queue-z传递过来的消息:"+json);
}
}
👇👇Controller
package org.example.produce.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/sendFanout")
public String sendFanout() {
Map<String,Object> data=new HashMap<>();
data.put("msg","我是扇形交换机,广播通知");
rabbitTemplate.convertAndSend("fanout-exchange",null, data);
return "😎";
}
}
🌷🌷效果展示:
?
?
到这里我的分享就结束了,欢迎到评论区探讨交流!!
💖如果觉得有用的话还请点个赞吧 💖