一般MQ用于系统解耦、削峰使用,常见于微服务、业务活动等场景。
RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
Queue:队列,是RabbitMQ的内部对象,用于存储消息
Exchange:交换器。生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:绑定。RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe
配置环境变量
cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin
开启rabbitmq-plugins插件
rabbitmq-plugins enable rabbitmq_management
打开地址
http://127.0.0.1:15672/
输入用户名/密码:guest/guest
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
package com.xxxx.user.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/******************direct**********************/
/**
* 创建direct队列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue("directQueue");
}
/**
* 创建direct交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
/**
* 把队列和交换机绑定在一起
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("routingKey");
}
/******************topic**********************/
@Bean
public Queue topicQuerue1(){
return new Queue("topicQuerue1");
}
@Bean
public Queue topicQuerue2(){
return new Queue("topicQuerue2");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1");
}
/**
* 通配符:* 表示一个词,# 表示零个或多个词
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
}
/******************fanout**********************/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanoutQueue1");
}
@Bean
public Queue fanoutQueue2(){
return new Queue("fanoutQueue2");
}
@Bean
public Queue fanoutQueue3(){
return new Queue("fanoutQueue3");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
package com.xxxx.user.consumer;
import com.xxxx.springCloud.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queues = "directQueue")
public class DataDirectReceiver {
@RabbitHandler
public void process(String data){
log.info("收到directQueue队列信息:" + data);
}
@RabbitHandler
public void process(UserInfo data){
log.info("收到directQueue队列信息:" + data);
}
}
package com.xxxx.user.consumer;
import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queues = {"topicQuerue1","topicQuerue2"})
public class DataFanoutReceiver {
@RabbitHandler
public void process(String data){
log.info("收到topicQuerue队列信息:" + data);
}
@RabbitHandler
public void process(UserInfo data){
log.info("收到topicQuerue队列信息:" + data);
}
}
package com.xxxx.user.consumer;
import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"})
public class DataTopicReceiver {
@RabbitHandler
public void process(String data){
log.info("收到topicQuerue队列信息:" + data);
}
@RabbitHandler
public void process(UserInfo data){
log.info("收到topicQuerue队列信息:" + data);
}
}
package com.xxxx.user;
import com.xxxx.common.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DataSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendDirect(){
UserInfo userInfo = new UserInfo();
userInfo.setUserAccount("tiger");
userInfo.setPassword("12345");
this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo);
}
@Test
public void sendTopic(){
this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic");
}
@Test
public void sendFanout(){
this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic");
}
}