专栏导航
目录
二、使用Spring AMQP实现对RabbitMQ的消息收发
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。
全称为Advanced Message Queuing Protocol,是一种用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。通过AMQP,不同的应用程序可以在不改变各自实现方式的情况下进行跨平台、跨语言的消息通信。
AMQP协议定义了消息的传输方式和消息的元数据,例如消息的发送者、接收者、消息体、消息类型等。这些元数据可以帮助应用程序对消息进行正确的处理。
在Spring框架中,有一个Spring AMQP的项目,它基于AMQP协议定义了一套API规范,提供了模板来发送和接收消息。这个项目包含两部分,其中spring-amqp是基础抽象,而spring-rabbit是底层的默认实现。
Spring AMQP通过提供模板和抽象层,简化了应用程序与RabbitMQ的交互。它提供了一组易于使用的API,用于发送和接收消息。这些API可以帮助开发人员更专注于业务逻辑,而不是消息的发送和接收细节。
spring-rabbit是Spring AMQP的一部分,它基于RabbitMQ实现了AMQP协议。spring-rabbit提供了对RabbitMQ的封装,使开发人员可以通过简单的配置和API调用与RabbitMQ进行交互。
Spring AMQP 主要功能:
官方文档:
Spring AMQPhttps://spring.io/projects/spring-amqp
项目结构如下:
项目结构介绍:
mq-demo:父工程,管理项目依赖
publisher:消息的发送者
consumer:消息的消费者
在父工程引入spring-amqp依赖:
<!--AMQP依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
项目完整依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.rye.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.15</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--Jackson-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
</dependencies>
</project>
在application.yml中配置RabbitMQ服务端信息(每个微服务都需要配置):
spring:
rabbitmq:
host: 10.0.0.100
port: 5672
virtual-host: /demo
username: user
password: 123456
案例模型:
在RabbitMQ管理控制台新建队列:
查看新建结果:
在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage2Queue() {
// 队列名称
String queueName = "demo.queue";
// 消息
String msg = "First demo";
// 发送消息
rabbitTemplate.convertAndSend(queueName, msg);
}
运行测试用例,查看结果:
在consumer服务中新建一个类实现消息接收 :
@Component
public class MqListener {
@RabbitListener(queues = "demo.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消息:" + msg);
}
}
启动consumer服务,查看消息(一旦监听的队列中有了消息,就会推送给当前服务):
让多个消费者绑定一个队列,共同消费队列中的消息。
案例模型:
在RabbitMQ管理控制台新建队列:
查看新建结果:
在publisher服务中的测试类添加一个测试方法(通过循环发送,模拟大量消息堆积现象 ):
@Test
void testWorkQueue() throws InterruptedException {
String queueName = "work.queue";
for (int i = 1; i <= 50; i++) {
String msg = "Work Queues " + i;
rabbitTemplate.convertAndSend(queueName, msg);
Thread.sleep(20);
}
}
在consumer服务的类中添加2个新的方法,模拟多个消费者绑定同一个队列 :
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收work.queue消息:" + msg);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收work.queue消息:" + msg);
}
运行结果:
修改consumer服务类中的方法:
消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
消费者2 sleep了200毫秒,相当于每秒处理5个消息
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收work.queue消息:" + msg);
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收work.queue消息:" + msg);
Thread.sleep(200);
}
重启后查看运行结果:
以上结果表明:默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
修改consumer服务的application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息(每次只能获取一条消息,处理完成才能获取下一个消息):
spring:
rabbitmq:
host: 10.0.0.100
port: 5672
virtual-host: /demo
username: user
password: 123456
listener:
simple:
prefetch: 1
重启后查看运行结果:
RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了AMQP、Spring AMQP和使用Spring AMQP实现对RabbitMQ的消息收发等内容,希望对大家有所帮助。