RabbitMQ与SpringAMQP

发布时间:2024年01月20日

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。(经纪人!)

1.RabbitMQ介绍

微服务间通讯有同步和异步两种方式

同步(通信):A——>C 异步(代理):A——>B——>C

知识点

同步调用

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高

  • 性能和吞吐能力下降

  • 有额外的资源消耗

  • 有级联失败问题

异步代理

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理

  • 需要依赖于Broker的可靠、安全、性能

特性

队列:先进先出

栈:先进后出(递归)

RabbitMQ中的一些角色:

  • publisher:生产者

  • consumer:消费者

  • exchange个:交换机,负责消息路由

  • queue:队列,存储消息

  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

2.RabbitMQ安装

1.单机部署

我们在Centos7虚拟机中使用Docker来安装。

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

下载镜像包:

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=mymq \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

3.SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

3.1.Basic Queue 简单队列模型

在父工程中引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
 ? ?<groupId>org.springframework.boot</groupId>
 ? ?<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.1.1.消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:
  rabbitmq:
 ?  host: 192.168.150.101 # 主机名
 ?  port: 5672 # 端口
 ?  virtual-host: / # 虚拟主机
 ?  username: itcast # 用户名
 ?  password: 123321 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

?
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
?
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
?
 ? ?@Autowired
 ? ?private RabbitTemplate rabbitTemplate;
?
 ? ?@Test
 ? ?public void testSimpleQueue() {
 ? ? ? ?// 队列名称
 ? ? ? ?String queueName = "simple.queue";
 ? ? ? ?// 消息
 ? ? ? ?String message = "hello, spring amqp!";
 ? ? ? ?// 发送消息
 ? ? ? ?rabbitTemplate.convertAndSend(queueName, message);
 ?  }
}

3.1.2.消息接收

首先配置MQ地址,在服务的application.yml中添加配置:

spring:
  rabbitmq:
 ?  host: 192.168.150.101 # 主机名
 ?  port: 5672 # 端口
 ?  virtual-host: / # 虚拟主机
 ?  username: itcast # 用户名
 ?  password: 123321 # 密码

然后在服务的包中新建一个类SpringRabbitListener,代码如下:

?
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
?
@Component
public class SpringRabbitListener {
    
 ? ?//监听队列
 ? ?@RabbitListener(queues = "simple.queue")
 ? ?public void listenSimpleQueueMessage(String msg) throws InterruptedException {
 ? ? ? ?System.out.println("spring 消费者接收到消息:【" + msg + "】");
 ?  }
}

3.1.3.测试

启动服务,然后在publisher服务中运行测试代码,发送MQ消息

总结步骤

文章来源:https://blog.csdn.net/qq_63535554/article/details/135721298
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。