目录
RabbitMQ 中的交换机(Exchange)是消息的分发中心,负责将消息发送到一个或多个队列。它接收生产者发送的消息并将这些消息路由到消息队列中
在RabbitMQ中生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取的方式给消费者
在交换机接收到生产者发送的消息时,交换机会根据消息的路由键和交换机类型,决定将消息路由到哪个队列,而不同类型的交换机也有不同的路由规则
1.Direct Exchange(直接交换机):
路由规则:消息的路由键与队列绑定时指定的路由键完全匹配。
行为:消息将被路由到与其路由键完全匹配的队列。
2.Topic Exchange(主题交换机):
路由规则:使用通配符进行模糊匹配。
通配符:
*
?匹配一个单词。
#
?匹配零个或多个单词。行为:消息将被路由到与路由键匹配的队列。
对比直连交换机:
我们知道,直连交换机的路由方案非常简单,但是当我们在希望将一条消息发送给多个队列时,那么这个交换机就需要绑定非常多的路由规则(routing_key),交换机绑定的路由规则(routing_key)越多,管理就越复杂,相对于直连交换机,主题交换机可以通过通配符进行模糊匹配多条消息进行筛选进入队列
拓展:当一个队列的绑定键是"#",它将会接收所有的消息,而不再考虑所接收消息的路由键,当一个队列的绑定键没有用到"#"和"*"时,它又像direct交换一样工作
?
3.Fanout Exchange(扇出交换机):
路由规则:路由键被忽略。
行为:消息将广播到所有与交换机绑定的队列。
4.Headers Exchange(头部交换机):
路由规则:使用消息的头部信息进行匹配。
行为:消息头部信息与队列绑定规则匹配时,消息将被路由到相应队列。
5.Dead Letter Exchange(死信交换机?):
RabbitMQ作为一个高级消息中间件,提出了死信交换器的概念这种交换器专门处理死了的信息(被拒绝可以重新投递的信息不能算死的)。消息变成死信一般是以下三种情况:
消息被拒绝,并且设置requeue 参数为 false
消息过期(默认情况下 Rabbit 中的消息不过期,但是可以设置队列的过期时间和消息的过期时间以达到消息过期的效果)队列达到最大长度(一般当设置了最大队列长度或大小并达到最大值时)当满足上面三种情况时,消息会变成死信消息,并通过死信交换机投递到相应的队列中我们只需要监听相应队列,就可以对死信消息进行最后的处理。
先在RabbitMQ配置类中创建一个直连交换机和两个队列做测试用
package com.yu.publisher;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
@Bean
public Queue queue1() {
return new Queue("queue1");
}
@Bean
public Queue queue2() {
return new Queue("queue2");
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
}
将队列与交换机进行绑定并配置路由键(BindingKey)
@Bean
public Binding binding1(){
return BindingBuilder.bind(queue1()).to(directExchange()).with("K1");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(queue2()).to(directExchange()).with("K2");
}
在控制层做测试通过交换机及路由键做信息发送
@RequestMapping("send3")
public String send3(){
//向消息队列发送消息
amqpTemplate.convertAndSend("directExchange","Q1","Hello");
return "🚀";
}
?编写消费者进行接收信息,消费者通过接收消息队列queue1
package com.yu.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "queue1")
public class ReceiverQ1 {
@RabbitHandler
public void process(String msg) {
log.warn("Q1接收到:" + msg);
}
}
在这个同时我们可以编写多个消费者,进行测试
package com.yu.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "queue2")
public class ReceiverQ2 {
@RabbitHandler
public void process(String msg) {
log.warn("Q2接收到:" + msg);
}
}
这时我们访问send3向交换机发送消息并由消费者接收
因为我们所绑定的键为K1所以会进入到对应的queue1队列并由消费者Q1接收
?当我们键绑定键改为K2,这时就会进入到对应的queue2队列并由消费者Q2接收
同时我们还可以在RabbitMQ控制界面看到我们刚刚创建的交换机
?
创建主题交换机并与队列、规则进行绑定
/***
* 主题交换机
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding bnding3(){
return BindingBuilder.bind(queue1()).to(topicExchange()).with("*.*.aa");
}
@Bean
public Binding bnding4(){
return BindingBuilder.bind(queue2()).to(topicExchange()).with("*.*.bb");
}
@Bean
public Binding bnding5(){
return BindingBuilder.bind(queue1()).to(topicExchange()).with("mq.#");
}
@Bean
public Binding bnding6(){
return BindingBuilder.bind(queue2()).to(topicExchange()).with("mq.#");
}
?将规则以参数的形式传递到发送消息的方法中
@RequestMapping("send4")
public String send4(String rex){
//向消息队列发送消息
amqpTemplate.convertAndSend("topExchange",rex,"Hello");
return "🚀";
}
输入规则进行访问
输出结果
同时制定了可以同时访问到的规则
?注意点:制定规则时不能使用数字
?相对于主题交换机而言,扇形交换机更多用于群发,所以可以直接绑定不需要用到规则
/**
* 扇形交换机
*
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding bnding7(){
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
public Binding bnding8(){
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
编写测试发送方法
@RequestMapping("send5")
public String send5(){
//向消息队列发送消息
amqpTemplate.convertAndSend("fanoutExchange",null,"Hello");
return "🚀";
}
当我们进行访问send5时可以直接群发消息接收
交换机是消息队列系统中的重要组件,通过合理配置交换机和队列的绑定关系,可以实现灵活、可靠的消息路由和分发,适应不同应用场景的需求?,根据不同的实际应用场景使用不同的交换机,能够更好得帮助我们的程序提升效率