RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP,Advanced Message Queuing Protocol)。它设计用于在分布式系统中传递消息,提供了一种可靠的、异步的通信方式,帮助不同的应用程序或组件之间进行解耦。
以下是 RabbitMQ 的一些主要特点和概念:
使用 RabbitMQ 可以有效地处理系统之间的异步通信,提高系统的可伸缩性和可维护性。它在分布式系统、微服务架构和异步任务处理等场景中广泛应用。
工作原理图:
RabbitMQ 提供了以下优势:
解耦与可靠性: 通过消息队列,系统的不同部分可以独立工作,提高可维护性和可扩展性。消息的可靠传递确保消息不会丢失,即使某个组件不可用。
异步通信: 消息队列支持异步通信,生产者将消息发送到队列,而消费者从队列中接收并处理消息,实现了松耦合和高效通信。
处理负载峰值: RabbitMQ 能够缓冲和调整消息流,有助于处理系统中的负载峰值,防止系统过载。
消息路由与灵活性: 不同类型的交换器使得消息能够以灵活的方式进行路由,满足多样化的应用场景。
多种交换器类型: 包括直连、扇出、主题和头交换器,支持不同的消息路由策略。
消息持久化: RabbitMQ 允许将消息和队列标记为持久的,确保消息不会在代理重启时丢失。
灵活的消息路由: 使用 Routing Key 和交换器,可以根据需求定义复杂的消息路由规则。
可扩展性与集群支持: RabbitMQ 提供了水平扩展的能力,支持构建高可用性的集群。
安全性: 支持虚拟主机,提供权限控制和加密传输,确保消息的安全性。
安装gcc
yum -y install gcc gcc-c++
安装软件包
yum install -y yum-utils device-mapper-persistent-data lvm2
设置镜像仓库
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
更新yum
yum makecache fast
安装免费版本的docker-ce
yum -y install docker-ce
启动docker
systemctl start docker
入门hello-world
docker run hello-world
证明docker安装成功!
先执行
docker search rabbitmq:management
拉取镜像
docker pull macintoshplus/rabbitmq-management
查看镜像
docker images
创建并运行一个RabbitMQ容器:
设置容器的主机名为kdxing,设置容器指定名称为 rabbitmq,设置RabbitMQ的默认用户名和密码,
将容器的15672端口映射到主机的15672端口,15672端口是RabbitMQ的Web管理界面端口。
将容器的5672端口映射到主机的5672端口,5672端口是RabbitMQ的AMQP协议端口。
设置Docker镜像的名称或ID为c20
docker run -d --hostname kdxing --name rabbitmq -e rabbitmq_default_user=guest -e rabbitmq_user_pass=guest -p 15672:15672 -p 5672:5672 c20
查看容器
docker ps -a
然后打开浏览器输入http://192.168.64.128:15672,界面如下:
输入用户名和密码guest进入RabbitMQ的Web管理界面:
此时,RabbitMQ配置成功!
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
spring.application.name=mq-demo01
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 自定义一个属性设置队列
mq.queue.name=hello-queue01
创建一个生产者类,用于发送消息到 RabbitMQ:
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.queue.name}")
private String queueName;
//发送消息
public void send(String msg){
amqpTemplate.convertAndSend(queueName,msg);
}
}
创建一个消费者类,用于接收并处理从 RabbitMQ 收到的消息:
package com.kdx.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//接收Consumer消息的消费者
@Component
public class Receiver {
@RabbitListener(queues = {"${mq.queue.name}"})
public void process(String msg){
System.out.println("Receiver:" + msg);
}
}
package com.kdx.config;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Value("${mq.queue.name}")
private String queueName;
@Bean
public Queue createQueue(){
return new Queue(queueName);
}
}
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo01ApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend() {
sender.send("RabbitMQ入门案例");
}
}
启动Application,测试testSend()方法,查看控制台:
发送消息和接收消息成功!
查看RabbitMQ的Web管理界面,点击队列,发现hello-queue01:
交换器是消息的分发中心,负责将消息路由到一个或多个队列。生产者将消息发送到交换器,而交换器根据规则将消息发送到与之绑定的队列。不同类型的交换器定义了不同的路由策略,包括直连交换器、扇出交换器、主题交换器和头交换器。
在 RabbitMQ 中,有几种不同类型的交换器(Exchange Types),每种类型都定义了不同的消息路由规则。以下是 RabbitMQ 支持的主要交换器类型:
*
匹配一个单词,#
匹配零个或多个单词。选择交换器类型取决于消息路由需求。例如,如果希望将消息直接发送到指定队列,可以选择 Direct Exchange;如果希望消息广播到所有队列,可以选择 Fanout Exchange;如果需要根据复杂的条件进行消息路由,可以选择 Topic Exchange 或 Headers Exchange。
RabbitMQ为什么需要信道? 为什么不是TCP直接通道 ?
- Tcp创建和销毁开销特别大。
- 如果不用信道,大量的请求过来,会造成性能的瓶颈。
- 信道的原理是一条线程一条信道,多条线程多条通道同用一条TCP连接。
- 一条TCP连接可能容纳无限的信道,处理高并发的请求不会造成性能瓶颈。
1.在consumer服务中,编写两个消费者方法,分别监听log.info和log.error
2.在publisher中编写测试方法,向log. direct发送消息
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8081
spring.application.name=mq-demo02-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.direct
#设置队列info
mq.config.queue.info=log.info
#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key
#设置队列error
mq.config.queue.error=log.error
#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
key = "${mq.config.error.routing.key}"
))
public class ErrorReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("errorReceiver:" + msg);
}
}
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
key = "${mq.config.info.routing.key}"
))
public class InfoReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("infoReceiver:" + msg);
}
}
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8082
spring.application.name=mq-demo03-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.direct
#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key
#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
@Value("${mq.config.info.routing.key}")
private String routingKey1;
@Value("${mq.config.error.routing.key}")
private String routingKey2;
public void send1(String msg){
amqpTemplate.convertAndSend(exchange,routingKey1,msg);
}
public void send2(String msg){
amqpTemplate.convertAndSend(exchange,routingKey2,msg);
}
}
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo03ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend1() {
sender.send1("hello mq 1");
}
@Test
void testSend2() {
sender.send2("hello mq 2");
}
}
启动两个Application,执行testSend1和testSend2:
结果看到Direct交换器根据RoutingKey判断路由给哪个队列
1.在consumer服务中,编写三个消费者方法,分别监听log.info、log.error和log.all
2.在publisher中编写测试方法,向 topic发送消息
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8083
spring.application.name=mq-demo05-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.topic
#设置队列info
mq.config.queue.info=log.info
#设置队列error
mq.config.queue.error=log.error
#设置队列logs
mq.config.queue.logs=log.all
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key = "*.log.error"
))
public class ErrorReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("errorReceiver:" + msg);
}
}
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key = "*.log.info"
))
public class InfoReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("infoReceiver:" + msg);
}
}
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
key = "*.log.*"
))
public class LogsReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("logsReceiver:" + msg);
}
}
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8084
spring.application.name=mq-demo04-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.topic
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class GoodServer {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg){
amqpTemplate.convertAndSend(exchange,"good.log.debug","good.log.debug:" + msg);
amqpTemplate.convertAndSend(exchange,"good.log.info","good.log.info:" + msg);
amqpTemplate.convertAndSend(exchange,"good.log.warn","good.log.warn:" + msg);
amqpTemplate.convertAndSend(exchange,"good.log.error","good.log.error:" + msg);
}
}
package com.kdx;
import com.kdx.provider.GoodServer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo04ProviderApplicationTests {
@Autowired
private GoodServer goodServer;
@Test
void test1() {
goodServer.send("hello mq");
}
}
启动消费者和生产者服务,执行test1():
结果看到使用通配符模式匹配 Routing Key,并将消息发送到与模式匹配的队列。
Topic交换器与队列绑定时的bindingKey可以指定通配符,而且Topic交换器接收的消息RoutingKey必须是多个单词,以
.
分割
1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2.在consumer服务中,编写两个消费者方法,分别监听order.sms和order.push
3.在publisher中编写测试方法,向log.fanout发送消息
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8086
spring.application.name=mq-demo07-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.fanout
#设置队列Q1
mq.config.queue.sms=order.sms
#设置队列Q2
mq.config.queue.push=order.push
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
))
public class SmsReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("sms:" + msg);
}
}
package com.kdx.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.push}",autoDelete = "false"),
exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
))
public class PushReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("push:" + msg);
}
}
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8085
spring.application.name=mq-demo06-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.fanout
package com.kdx.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg){
amqpTemplate.convertAndSend(exchange,"",msg); //routingKey不写
}
}
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo06ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend() {
sender.send("fanout广播");
}
}
启动消费者和生产者服务,测试testSend():
结果看到SmsReceiver和PushReceiver都接收到了交换器广播消息。
RabbitMQ 提供了消息的持久化机制,确保即使在 RabbitMQ 服务器重启后,消息仍然能够被恢复。这主要涉及到队列和消息的持久化。
在 5 Direct Echange的案例基础上修改测试方法
package com.kdx;
import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqDemo03ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void testSend1() {
for (int i = 1; i < 1000; i++) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sender.send1("hello mq 1 ..." + i);
}
}
}
必须确保在
autoDelete = "false"
:
出现在Queue中:当所有的消费者客户连接断开后,是否自动删除队列。
出现在Exchange中:当所有的绑定队列都不再使用时,是否自动删除交换器。
启动消费者和生产者服务,执行testSend1()方法:
现在停止消费者服务,结束在infoReceiver:hello mq 1 ...7
然后再重启消费者服务,控制台:
发现没有重新从infoReceiver:hello mq 1 ...1
开始输出,而是接着7从infoReceiver:hello mq 1 ...8
开始,这就是消息的持久化。
RabbitMQ 的 Acknowledgment(简称 ack)机制是确保消息在消费者正确处理后才被确认的一种机制。它有助于提高消息传递的可靠性。在 RabbitMQ 中,有三种 Acknowledgment 模式:自动确认、手动确认(单条消息)、手动批量确认(foreach遍历)。
在自动确认模式下,消息一旦被消费者接收,RabbitMQ 就会立即确认消息的接收。这种模式下,消费者无法明确知道消息是否被正确处理。
// 默认是自动确认模式
@RabbitListener(queues = "myQueue")
public void handleMessage(String message) {
// 处理消息的业务逻辑...
}
在手动确认模式下,消费者需要显式地告诉 RabbitMQ 是否成功处理了消息。如果消费者成功处理消息,则调用 channel.basicAck
进行确认;如果处理失败,则可以调用 channel.basicNack
或 channel.basicReject
进行拒绝。
@RabbitListener(queues = "myQueue")
public void handleMessage(Message message, Channel channel) throws IOException {
try {
// 处理消息的业务逻辑...
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理异常,可以选择拒绝消息或者进行其他处理
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
在手动确认模式下,消费者需要谨慎处理异常情况,以确保消息在处理失败时能够得到适当的处理。手动确认模式提供了更精细的控制,确保消息在被消费者正确处理后才被确认。