RabbitMQ 是一个开源的消息中间件,实现了高级消息队列协议(AMQP),用于在分布式系统中进行消息传递。它能够在应用之间传递消息,解耦应用组件,提高系统的可伸缩性和可维护性。RabbitMQ 使用高级消息队列协议(AMQP),这是一种开放的、标准化的协议,定义了消息格式、交换方式、队列管理等规范。拥有强大的社区支持,提供了广泛的文档和示例。它还支持插件机制,可以根据实际需求进行扩展。下面就对rabbitMQ进行实战。
官网:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
?
消息中间件经常被用来处理异步、削峰填谷,和多个组件之间进行解耦的作用。
这里演示使用docker-compose方式安装,创建一个docker-compose.yml文件并写入以下内容
version: '3'
services:
rabbitmq:
image: "rabbitmq:management"
container_name: "rabbitmq-container"
ports:
- "5672:5672" # RabbitMQ 默认端口
- "15672:15672" # RabbitMQ 管理界面端口
volumes:
- "./rabbitmq-data:/var/lib/rabbitmq" # 数据文件挂载
environment:
RABBITMQ_DEFAULT_USER: "rabbit"
RABBITMQ_DEFAULT_PASS: "rabbit1qz"
执行命令拉取并启动容器
docker-compose up -d
?执行命令查看docker容器是否正常
docker ps | grep rabbit
?然后地址栏http://localhost:15672/访问rabbitMQ管理界面
输入用户名密码可以看到如下界面,在这个页面上可以创建Exchanges和Queue,这里就不赘述了,大家想了解的可以参考官方文档。
?
上面安装完MQ组件之后,就可以用java代码进行连接测试了。使用Maven添加RabbitMQ的Java客户端库到项目里:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.0</version> <!-- 替换为最新版本 -->
</dependency>
?
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
public class MessageSender {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // RabbitMQ服务器地址
// 创建连接
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
?
import com.rabbitmq.client.*;
public class MessageReceiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // RabbitMQ服务器地址
// 创建连接
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义消息处理器
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列,接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
// 持续监听队列,不会退出
System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");
Thread.sleep(Long.MAX_VALUE);
}
}
}
?
以上示例中,MessageSender
类用于发送消息到名为"hello"的队列,而MessageReceiver
类用于监听该队列并接收消息。
RabbitMQ 提供了许多高级特性,包括持久化、消息确认、事务、死信队列等。下面将结合 Java 完整代码进行说明这些高级特性。
持久化确保在 RabbitMQ 服务器重启时,队列和消息不会丢失。
代码示例
// 发布者代码
public class DurableProducer {
// ...初始化 RabbitMQ 连接等代码...
public void publishPersistentMessage(String message) {
channel.basicPublish("", "durable_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
// 订阅者代码
public class DurableSubscriber {
// ...初始化 RabbitMQ 连接等代码...
public void subscribeToPersistentMessages() {
channel.queueDeclare("durable_queue", true, false, false, null);
channel.basicConsume("durable_queue", true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 处理消息的逻辑...
}, consumerTag -> {});
}
}
?
消息确认确保消息已经被消费者成功处理。
代码示例
// 发布者代码
public class AckProducer {
// ...初始化 RabbitMQ 连接等代码...
public void publishAckMessage(String message) {
channel.basicPublish("", "ack_queue", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
// 订阅者代码
public class AckSubscriber {
// ...初始化 RabbitMQ 连接等代码...
public void subscribeToAckMessages() {
channel.queueDeclare("ack_queue", false, false, false, null);
channel.basicConsume("ack_queue", false, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 处理消息的逻辑...
// 手动发送消息确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});
}
}
?
RabbitMQ 支持事务,但由于性能问题,通常建议使用消息确认代替。
代码示例
// 发布者代码
public class TransactionalProducer {
// ...初始化 RabbitMQ 连接等代码...
public void publishTransactionalMessage(String message) throws IOException {
try {
channel.txSelect(); // 开启事务
channel.basicPublish("", "transactional_queue", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.txCommit(); // 提交事务
} catch (IOException e) {
channel.txRollback(); // 回滚事务
e.printStackTrace();
}
}
}
// 订阅者代码
public class TransactionalSubscriber {
// ...初始化 RabbitMQ 连接等代码...
public void subscribeToTransactionalMessages() {
try {
channel.queueDeclare("transactional_queue", false, false, false, null);
while (true) {
channel.txSelect(); // 开启事务
GetResponse response = channel.basicGet("transactional_queue", true);
if (response != null) {
String message = new String(response.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 处理消息的逻辑...
channel.txCommit(); // 提交事务
} else {
channel.txRollback(); // 回滚事务
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
?
死信队列用于处理无法被消费者成功处理的消息。
代码示例
// 发布者代码
public class DeadLetterProducer {
// ...初始化 RabbitMQ 连接等代码...
public void publishDeadLetterMessage(String message) {
Map<String, Object> headers = new HashMap<>();
headers.put("x-dead-letter-exchange", "dead_letter_exchange");
headers.put("x-dead-letter-routing-key", "dl_queue");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish("", "original_queue", properties, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
// 订阅者代码
public class DeadLetterSubscriber {
// ...初始化 RabbitMQ 连接等代码...
public void subscribeToDeadLetterMessages() {
channel.exchangeDeclare("dead_letter_exchange", BuiltinExchangeType.DIRECT);
channel.queueDeclare("dl_queue", false, false, false, null);
channel.queueBind("dl_queue", "dead_letter_exchange", "");
channel.queueDeclare("original_queue", false, false, false, null);
channel.queueBind("original_queue", "", "original_queue");
channel.basicConsume("original_queue", false, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 处理消息的逻辑...
// 模拟处理失败,将消息发送到死信队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}, consumerTag -> {});
}
}
?
这些是 RabbitMQ 的一些高级特性的简单示例。在实际项目中,具体的实现可能会更加复杂,并需要根据场景进行适当的调整。
RabbitMQ作为一款强大的消息中间件,在异步任务处理、事件驱动架构以及日志收集等场景中都有广泛的应用。通过简单的代码实例,我们了解了RabbitMQ的基本概念以及如何在Java中使用RabbitMQ进行消息的发送和接收。
希望本文能够帮助大家入门RabbitMQ,并在实际项目中灵活应用消息队列的机制。
?