在当今互联网时代,消息队列系统扮演着至关重要的角色,它们被广泛应用于分布式系统、微服务架构以及异步通信等领域。RabbitMQ作为最流行的开源消息队列之一,具有高可用性、可靠性和灵活性,本篇博客将深入探讨RabbitMQ的高级应用,包括简介介绍、安装配置以及实际案例分析。
RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP),并提供了可靠的消息传递、灵活的路由、消息确认机制等特性。它是一个强大的工具,用于构建分布式系统中的异步通信和解耦,同时也适用于构建大规模的数据处理系统。RabbitMQ的核心概念包括交换机、队列、绑定等,通过这些概念,可以实现灵活的消息路由和处理。在分布式系统中,RabbitMQ能够提供高可用性、可靠性和灵活性,使得系统能够更好地应对高并发和大规模数据处理的挑战。
RabbitMQ是一个实现了高级消息队列协议(AMQP)的消息代理软件。AMQP是一种面向消息的中间件标准协议,旨在提供统一的消息传递模型,以便不同的消息中间件系统可以进行互操作。、
RabbitMQ完全实现了AMQP 0-9-1版本,这意味着它遵循了AMQP协议定义的消息格式、交换机、队列、绑定等核心概念,同时也支持AMQP定义的高级特性,如事务、确认机制等。
通过遵循AMQP协议,RabbitMQ能够与其他支持AMQP的消息中间件系统进行无缝集成,实现跨平台、跨语言的消息传递。这使得开发人员可以更加灵活地选择适合自己业务需求的消息中间件系统,同时也为系统的可扩展性和互操作性提供了保障。
拉取Rabbitmq镜像
docker pull rabbitmq:managemen
开启两个端口,5672,15672
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
更新防火墙
firewall-cmd --reload
查看开放端口
firewall-cmd --zone=public --list-ports
然后创建容器
docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
创建好了就可以访问了
创建用户
给他分配一个my_vhost权限
现在RabbitMQ安装设置好了
创建一个空项目
并创建两个maven项目,分别对应着消费者和生产者
分别写好yml文件
消费者8999,生产者8888(端口号)
server:
port: 8888
spring:
rabbitmq:
host: 192.168.238.129
password: 123456
port: 5672
username: spring
virtual-host: my_vhost
在生产者项目中放一个消息队列,并取名,firstQueue是队列名字
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
@Bean
public Queue firstQueue() {
return new Queue("firstQueue");
}
public Queue secondQueue() {
return new Queue("secondQueue");
}
}
导这个包
然后准备一个发消息的Controller
package com.example.publiuse;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 叶秋
* @site
* @company 卓京公司
* @create 2024-01-19 20:10
*/
@RestController
public class TestController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/send1")
public String send1(){
//向消息队列发送消息
amqpTemplate.convertAndSend("firstQueue","hello");
return "哈哈;";
}
}
访问一手
消息队列显示
消费者监听
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue")
public class Receiver {
@RabbitHandler
public void process(String msg) {
log.warn("接收到:" + msg);
}
}
http://localhost:8888/send1 访问几次
然后被监听
这样就大大解除了代码的耦合性
@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable{
private String username;
private String userpwd;
}
生产者事件
@RequestMapping("/send2")
public String send2() throws Exception{
//向消息队列发送消息
User user = new User("yh", "123");
String string = objectMapper.writeValueAsString(user);
amqpTemplate.convertAndSend("secondQueue",string);
return "哈哈;";
}
消费者监听
package com.example.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "secondQueue")
public class Receiver2 {
@Autowired
private ObjectMapper objectMapper;
@RabbitHandler
public void process(String string) throws Exception{
User user = objectMapper.readValue(string, User.class);
log.warn("接收到:" + user);
}
}
通过本篇博客的学习,读者将深入了解RabbitMQ的核心概念、高级特性以及实际应用场景,掌握如何搭建高可用的消息队列系统,为构建稳定、可靠的分布式系统提供有力支持。