????????消息队列是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数。也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。
????????服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信) 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)
????????一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.
技术选型
????????RabbitMQ是一个开源的消息队列系统,使用Erlang语言开发,基于AMQP(高级消息队列协议)实现。它最初起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ的主要特性包括易用性、扩展性、高可用性以及可靠性。消息队列(MQ)是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。在事件驱动(发布-订阅)架构中,RabbitMQ扮演着Broker的角色。
【注意】获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面
docker pull rabbitmq:management
--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量:
???????RABBITMQ_DEFAULT_VHOST:默认虚拟机名
????????RABBITMQ_DEFAULT_USER:默认的用户名
????????RABBITMQ_DEFAULT_PASS:默认用户名的密码docker run -d \ --name 容器名\ -p 5672:5672 -p 15672:15672 \ -v /home/rabbitmq:/var/lib/rabbitmq \ --hostname my-rabbitmq-host \ -e RABBITMQ_DEFAULT_VHOST=my_vhost \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --restart=always \ rabbitmq:management
容器启动后,查看容器日志
docker logs 容器名
开启防火墙
systemctl start firewalld
开放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
更新防火墙规则
firewall-cmd --reload
进入管理后台
密码:admin
账号:admin
创建用户
点进用户进行分配
?新建一个空项目,里面新建两个spring boot模块,并且导入依赖
接收者(publisher)消费者(consumer)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring:
rabbitmq:
host: 0.0.0.0 #虚拟机开启的IP地址
username: spring #创建的用户
password: 123456 #用户的密码
port: 5672
virtual-host: my_vhost
接收一个string类型
生产者
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { // 创建一个名为firstQueue的队列 return new Queue("firstQueue"); } }
使用Controller?
@RestController @SuppressWarnings("all") public class SenderController { // 自动装配rabbitTemplate @Autowired private AmqpTemplate rabbitTemplate; // 发送消息到firstQueue队列 @RequestMapping("/sendFirst") public String sendFirst() { // 将消息转换并发送到firstQueue队列 rabbitTemplate.convertAndSend("firstQueue", "Hello World"); return "🐉"; } }
访问http://localhost:8888/sendFirst
可以看到消息队列中有了一个
消费者
@Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "firstQueue") public class Receiver { @RabbitHandler public void process(String msg) { log.warn("接收到:" + msg); } }
运行之后可以看到我们发送的消息
我们接受一个实体类
生产者(publisher)
新建一个实体类User 实现接口Serializable
@SuppressWarnings("all") @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String userpwd; }
在RabbitConfig 类里面添加一个队列
@Bean public Queue secondQueue() { // 创建一个名为 secondQueue 的队列 return new Queue("secondQueue"); }
完整代码?RabbitConfig?
@Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { // 创建一个名为firstQueue的队列 return new Queue("firstQueue"); } @Bean public Queue secondQueue() { // 创建一个名为 secondQueue 的队列 return new Queue("secondQueue"); } }
在Controller 层添加方法
// 自动装配ObjectMapper @Autowired private ObjectMapper objectMapper; // 发送消息到secondQueue队列 @RequestMapping("/send2") public String send2() throws JsonProcessingException { // 创建一个User对象 User wfzldr = new User("wfzldr", "1234567890"); // 将User对象转换为json字符串 String json = objectMapper.writeValueAsString(wfzldr); // 将消息转换并发送到firstQueue队列 // 将消息转换并发送到secondQueue队列 rabbitTemplate.convertAndSend("secondQueue", json); return "🐉"; }
完整代码SenderController
@RestController @SuppressWarnings("all") public class SenderController { // 自动装配rabbitTemplate @Autowired private AmqpTemplate rabbitTemplate; // 自动装配ObjectMapper @Autowired private ObjectMapper objectMapper; // 发送消息到firstQueue队列 @RequestMapping("/sendFirst") public String sendFirst() { // 将消息转换并发送到firstQueue队列 rabbitTemplate.convertAndSend("firstQueue", "Hello World"); return "🐉"; } // 发送消息到secondQueue队列 @RequestMapping("/send2") public String send2() throws JsonProcessingException { // 创建一个User对象 User wfzldr = new User("wfzldr", "1234567890"); // 将User对象转换为json字符串 String json = objectMapper.writeValueAsString(wfzldr); // 将消息转换并发送到firstQueue队列 // 将消息转换并发送到secondQueue队列 rabbitTemplate.convertAndSend("secondQueue", json); return "🐉"; } }
消费者(consumer)
新建一个接受实体的Receiver类
@Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "secondQueue") public class EntityReceiver { @Autowired private ObjectMapper objectMapper; @RabbitHandler public void process(String json) throws JsonProcessingException { User user = objectMapper.readValue(json, User.class); log.warn("接收到:" + user); } }
我破门也需要在消费者里添加实体
@SuppressWarnings("all") @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String userpwd; }
运行两个方法
?
在项目里面新建一个publi的公共模块,在里面写公共的实体类
在公共模块的pom.xml文件里面把打包方式war改成 jar?
在对应的消费者或者生产者里面引入对应的模块即可
<!-- 引入公共模块--> <dependency> <groupId>org.example</groupId> <artifactId>public</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
我的分享就到这里,欢迎大家在评论区留言!