????????MQ的意义是消息队列(Message Queue)的缩写,是一种用来在应用程序之间传递消息的技术。MQ的主要作用是解耦应用程序之间的通信,提高系统的可伸缩性和可靠性。
解耦:MQ将消息的发送者和接收者解耦,使得它们可以独立演进,互不影响。发送者只需要将消息发送到MQ中,不需要关心谁来消费消息。接收者只需从MQ中订阅感兴趣的消息,不需要关心消息是从哪里发送来的。
异步:MQ可以实现异步处理消息,发送者在发送消息后不需要等待接收者的处理结果,而是可以继续处理其他任务。这样可以提高系统的响应速度和吞吐量。
流量控制:MQ可以对消息进行缓冲和流量控制,可以控制消息的发送速率,避免发送者发送过多的消息导致接收者无法处理。
可靠性:MQ可以保证消息的可靠传输。当消息发送失败时,MQ会自动重试发送,直到发送成功。同时,MQ还可以持久化消息,即使在MQ宕机后消息也能够恢复。
扩展性:由于MQ可以实现解耦,因此可以很容易地扩展系统的各个组件。只需要添加新的消息队列即可,无需修改已有的代码。
????????RabbitMQ 是一个开源的高性能、可扩展、消息中间件(Message Broker),实现了 Advanced Message Queuing Protocol(AMQP)协议,可以帮助不同应用程序之间进行通信和数据交换。
RabbitMQ 是由 Erlang 开发的,支持多种编程语言,包括 Java、Python、Ruby、PHP、C# 等。它的核心思想是将发送者(producer)与接收者(consumer)完全解耦,实现异步处理和低耦合度的系统架构。
???????
基本消息队列(BasicQueue)工作消息队列(WorkQueue)
???????
???????发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
Fanout Exchange:广播 Direct Exchange:路由 Topic Exchange:主题
1 拉取镜像
docker pull rabbitmq:3-management
2 运行容器
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASSF=123321 \
--name mq \
--hostname mql \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
(1)导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2) application.yml添加配置
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
(3) 在publisher服务中新建一个测试类,编写测试方法:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息 :【" + msg + "】");
}
}
(4)在consumer服务中新建一个类,编写消费逻辑:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息 :【" + msg + "】");
}
}
案例?模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下: 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue 在consumer服务中定义两个消息监听者,都监听simple.queue队列 消费者1每秒处理50条消息,消费者2每秒处理10条消息
注:多个消费者绑定到一个队列,同一条消息只会被一个消费者处理 通过设置prefetch来控制消费者预取的消息数量
(1) yml配置
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
(2)生产者
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message__";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
// 避免发送太快
Thread.sleep(20);
}
}
(3)消费者
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者1接收到消息:【" + msg + "】");
Thread.sleep(25);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2接收到消息:【" + msg + "】");
Thread.sleep(100);
}
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。 常见exchange类型包括: Fanout:广播 Direct:路由 Topic:话题
说明:Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue
????????在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
@Configuration
public class FanoutConfig {
// 声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// 声明第1个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//绑 定队列1和交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 声明第2个队列
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//绑 定队列2和交换机
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
接收publisher发送的消息 将消息按照规则路由到与之绑定的队列 不能缓存消息,路由失败,消息丢失 FanoutExchange的会将消息路由到每个绑定的队列 声明队列、交换机、绑定关系的Bean是什么? Queue FanoutExchange Binding
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。 每一个Queue都与Exchange设置一个BindingKey 发布者发送消息时,指定消息的RoutingKey Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
步骤1:在consumer服务声明Exchange、Queue
1 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,
2 并利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到Direct消息:【"+msg+"】 ");
}
步骤2:在publisher服务发送消息到DirectExchange
@Test
public void testDirectExchange() {
// 队列名称
String exchangeName = "itcast.direct";
// 消息
String message = "红色测试内容!";
// 发送消息,参数依次为:交换机名称,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
Fanout交换机将消息路由给每一个与之绑定的队列 Direct交换机根据RoutingKey判断路由给哪个队列 如果多个队列具有相同的RoutingKey,则与Fanout功能类似 基于@RabbitListener注解声明队列和交换机有哪些常见注解? @Queue @Exchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。 Queue与Exchange指定BindingKey时可以使用通配符: #:代指0个或多个单词 *:代指一个单词
实现思路如下:
1.并利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3.在publisher中编写测试方法,向itcast. topic发送消息
?
步骤1:在consumer服务声明Exchange、Queue
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,
2并利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到Topic消息:【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到Topic消息:【"+msg+"】");
}
步骤2:在publisher服务发送消息到TopicExchange
@Test
public void testTopicExchange() {
// 队列名称
String exchangeName = "itcast.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割 Topic交换机与队列绑定时的bindingKey可以指定通配符 #:代表0个或多个词 *:代表1个词
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。 如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
(1) 在publisher服务引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
(2) 在publisher服务的启动类声明MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
(3) 我们在consumer服务引入Jackson依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
(4) 在consumer服务的启动类定义MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
(5) 然后定义一个消费者,监听object.queue队列并消费消息:?
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg) {
System.out.println("收到消息:【" + msg + "】");
}