RabbitMQ

发布时间:2023年12月18日

RabbitMQ

1 介绍

RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP,Advanced Message Queuing Protocol)。它设计用于在分布式系统中传递消息,提供了一种可靠的、异步的通信方式,帮助不同的应用程序或组件之间进行解耦。

以下是 RabbitMQ 的一些主要特点和概念:

  1. 消息代理(Message Broker): RabbitMQ 充当消息代理,负责接收、存储和转发消息。
  2. 消息队列(Message Queue): RabbitMQ 使用消息队列来存储消息。生产者将消息发送到队列,然后消费者从队列中接收和处理消息。队列采用先进先出(FIFO)的原则,即先发送的消息会先被消费。
  3. 生产者(Producer): 生产者是消息的发送方,负责将消息发送到 RabbitMQ 的队列。
  4. 消费者(Consumer): 消费者是消息的接收方,负责从队列中获取消息并进行处理。
  5. 交换器(机)(Exchange): 交换机用于将消息路由到一个或多个队列。生产者将消息发送到交换机,而交换机根据规则将消息路由到相应的队列。
  6. 路由键(Routing Key): 路由键是用于指定消息路由规则的关键字。在发送消息时,生产者通过指定路由键将消息发送到交换器。在某些交换器类型中,路由键用于匹配与之绑定的队列,决定消息将被发送到哪个队列。
  7. 绑定(Binding): 绑定是交换机和队列之间的关联规则,它定义了消息应该如何从交换机路由到队列。
  8. 虚拟主机(Virtual Host): 虚拟主机提供了一种逻辑隔离机制,允许在同一物理服务器上运行多个独立的消息代理。
  9. 持久化(Durable): RabbitMQ 允许将队列和消息标记为持久的,确保在代理重启时消息不会丢失。

使用 RabbitMQ 可以有效地处理系统之间的异步通信,提高系统的可伸缩性和可维护性。它在分布式系统、微服务架构和异步任务处理等场景中广泛应用。

工作原理图:

在这里插入图片描述

1.1 为什么使用 RabbitMQ

RabbitMQ 提供了以下优势:

  • 解耦与可靠性: 通过消息队列,系统的不同部分可以独立工作,提高可维护性和可扩展性。消息的可靠传递确保消息不会丢失,即使某个组件不可用。

  • 异步通信: 消息队列支持异步通信,生产者将消息发送到队列,而消费者从队列中接收并处理消息,实现了松耦合和高效通信。

  • 处理负载峰值: RabbitMQ 能够缓冲和调整消息流,有助于处理系统中的负载峰值,防止系统过载。

  • 消息路由与灵活性: 不同类型的交换器使得消息能够以灵活的方式进行路由,满足多样化的应用场景。

1.2 RabbitMQ 的关键特性

  • 多种交换器类型: 包括直连、扇出、主题和头交换器,支持不同的消息路由策略。

  • 消息持久化: RabbitMQ 允许将消息和队列标记为持久的,确保消息不会在代理重启时丢失。

  • 灵活的消息路由: 使用 Routing Key 和交换器,可以根据需求定义复杂的消息路由规则。

  • 可扩展性与集群支持: RabbitMQ 提供了水平扩展的能力,支持构建高可用性的集群。

  • 安全性: 支持虚拟主机,提供权限控制和加密传输,确保消息的安全性。

2 RabbitMQ 安装与配置

2.1 先安装Docker

安装gcc

yum -y install gcc gcc-c++

安装软件包

yum install -y yum-utils device-mapper-persistent-data lvm2

设置镜像仓库

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

更新yum

yum makecache fast

安装免费版本的docker-ce

yum -y install docker-ce

启动docker

systemctl start docker

入门hello-world

docker run hello-world

在这里插入图片描述

证明docker安装成功!

2.2 配置RabbitMQ

先执行

docker search rabbitmq:management

在这里插入图片描述

拉取镜像

docker pull macintoshplus/rabbitmq-management

查看镜像

docker images

在这里插入图片描述

创建并运行一个RabbitMQ容器:

设置容器的主机名为kdxing,设置容器指定名称为 rabbitmq,设置RabbitMQ的默认用户名和密码,

将容器的15672端口映射到主机的15672端口,15672端口是RabbitMQ的Web管理界面端口。

将容器的5672端口映射到主机的5672端口,5672端口是RabbitMQ的AMQP协议端口。

设置Docker镜像的名称或ID为c20

docker run -d --hostname kdxing --name rabbitmq -e rabbitmq_default_user=guest -e rabbitmq_user_pass=guest -p 15672:15672 -p 5672:5672 c20

查看容器

docker ps -a

在这里插入图片描述

然后打开浏览器输入http://192.168.64.128:15672,界面如下:

在这里插入图片描述

输入用户名和密码guest进入RabbitMQ的Web管理界面:

在这里插入图片描述

此时,RabbitMQ配置成功!

3 Spring AMQP入门案例

3.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

spring.application.name=mq-demo01

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 自定义一个属性设置队列
mq.queue.name=hello-queue01

3.3 创建生产者

创建一个生产者类,用于发送消息到 RabbitMQ:

package com.kdx.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${mq.queue.name}")
    private String queueName;

    //发送消息
    public void send(String msg){
        amqpTemplate.convertAndSend(queueName,msg);
    }
}

3.4 创建消费者

创建一个消费者类,用于接收并处理从 RabbitMQ 收到的消息:

package com.kdx.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//接收Consumer消息的消费者
@Component
public class Receiver {

    @RabbitListener(queues = {"${mq.queue.name}"})
    public void process(String msg){
        System.out.println("Receiver:" + msg);
    }
}

3.5 创建配置类

package com.kdx.config;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

    @Value("${mq.queue.name}")
    private String queueName;

    @Bean
    public Queue createQueue(){
        return new Queue(queueName);
    }
}

3.6 测试

package com.kdx;

import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MqDemo01ApplicationTests {

    @Autowired
    private Sender sender;

    @Test
    void testSend() {
        sender.send("RabbitMQ入门案例");
    }
}

启动Application,测试testSend()方法,查看控制台:

在这里插入图片描述

发送消息和接收消息成功!

查看RabbitMQ的Web管理界面,点击队列,发现hello-queue01:

在这里插入图片描述

4 交换器(Exchange)类型

交换器是消息的分发中心,负责将消息路由到一个或多个队列。生产者将消息发送到交换器,而交换器根据规则将消息发送到与之绑定的队列。不同类型的交换器定义了不同的路由策略,包括直连交换器、扇出交换器、主题交换器和头交换器。

在 RabbitMQ 中,有几种不同类型的交换器(Exchange Types),每种类型都定义了不同的消息路由规则。以下是 RabbitMQ 支持的主要交换器类型:

4.1 Direct Exchange(直连交换器)

  • 根据消息的路由键(Routing Key)将消息直接发送到指定队列。
  • 在发送消息时,指定的 Routing Key 必须与队列绑定时指定的 Routing Key 相匹配。

4.2 Fanout Exchange(扇出交换器)

  • 将消息广播到绑定到该交换器的所有队列,无论消息的 Routing Key 是什么。
  • 不关心消息的 Routing Key,消息会被发送到所有与交换器绑定的队列。

4.3 Topic Exchange(主题交换器)

  • 使用通配符的方式进行消息的路由。
  • 在发送消息时,可以使用通配符模式匹配 Routing Key,将消息发送到与模式匹配的队列。
  • 通配符有两种:* 匹配一个单词,# 匹配零个或多个单词。

4.4 Headers Exchange(头交换器)

  • 根据消息的头部属性来进行路由。
  • 在发送消息时,可以通过设置消息的头部属性,交换器会根据头部属性匹配规则将消息发送到对应的队列。

4.5 System Exchange(默认交换器)

  • 默认交换器是一个特殊的直连交换器,无需指定交换器的名称。
  • 当消息的 Routing Key 与队列的名称匹配时,消息会被发送到该队列。

选择交换器类型取决于消息路由需求。例如,如果希望将消息直接发送到指定队列,可以选择 Direct Exchange;如果希望消息广播到所有队列,可以选择 Fanout Exchange;如果需要根据复杂的条件进行消息路由,可以选择 Topic Exchange 或 Headers Exchange。

面试题

RabbitMQ为什么需要信道? 为什么不是TCP直接通道 ?

  1. Tcp创建和销毁开销特别大。
  2. 如果不用信道,大量的请求过来,会造成性能的瓶颈。
  3. 信道的原理是一条线程一条信道,多条线程多条通道同用一条TCP连接。
  4. 一条TCP连接可能容纳无限的信道,处理高并发的请求不会造成性能瓶颈。

5 Direct Exchange案例

1.在consumer服务中,编写两个消费者方法,分别监听log.info和log.error

2.在publisher中编写测试方法,向log. direct发送消息

在这里插入图片描述

5.1 消费者

5.1.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.1.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8081

spring.application.name=mq-demo02-consumer

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器
mq.config.exchange=log.direct

#设置队列info
mq.config.queue.info=log.info
#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key

#设置队列error
mq.config.queue.error=log.error
#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key

5.1.3 ErrorReceiver

package com.kdx.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"),
        exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
        key = "${mq.config.error.routing.key}"
))
public class ErrorReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("errorReceiver:" + msg);
    }
}

5.1.4 InfoReceiver

package com.kdx.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.info}",autoDelete = "false"),
        exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
        key = "${mq.config.info.routing.key}"
))
public class InfoReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("infoReceiver:" + msg);
    }
}

5.2 生产者

5.2.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.2.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8082

spring.application.name=mq-demo03-provider

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器
mq.config.exchange=log.direct

#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key

#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key

5.2.3 Sender

package com.kdx.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${mq.config.exchange}")
    private String exchange;

    @Value("${mq.config.info.routing.key}")
    private String routingKey1;

    @Value("${mq.config.error.routing.key}")
    private String routingKey2;

    public void send1(String msg){
        amqpTemplate.convertAndSend(exchange,routingKey1,msg);
    }

    public void send2(String msg){
        amqpTemplate.convertAndSend(exchange,routingKey2,msg);
    }
}

5.3 测试

package com.kdx;

import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MqDemo03ProviderApplicationTests {

    @Autowired
    private Sender sender;

    @Test
    void testSend1() {
        sender.send1("hello mq 1");
    }

    @Test
    void testSend2() {
        sender.send2("hello mq 2");
    }
}

启动两个Application,执行testSend1和testSend2:

在这里插入图片描述

结果看到Direct交换器根据RoutingKey判断路由给哪个队列

6 Topic Exchange案例

1.在consumer服务中,编写三个消费者方法,分别监听log.info、log.error和log.all

2.在publisher中编写测试方法,向 topic发送消息

在这里插入图片描述

6.1 消费者

6.1.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

6.1.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8083

spring.application.name=mq-demo05-consumer

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器
mq.config.exchange=log.topic

#设置队列info
mq.config.queue.info=log.info

#设置队列error
mq.config.queue.error=log.error

#设置队列logs
mq.config.queue.logs=log.all

6.1.3 ErrorReceiver

package com.kdx.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"),
        exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
        key = "*.log.error"
))
public class ErrorReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("errorReceiver:" + msg);
    }
}

6.1.4 InfoReceiver

package com.kdx.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.info}",autoDelete = "false"),
        exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
        key = "*.log.info"
))
public class InfoReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("infoReceiver:" + msg);
    }
}

6.1.5 LogsReceiver

package com.kdx.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "false"),
        exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
        key = "*.log.*"
))
public class LogsReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("logsReceiver:" + msg);
    }
}

6.2 生产者

6.2.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

6.2.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8084

spring.application.name=mq-demo04-provider

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器
mq.config.exchange=log.topic

6.2.3 GoodServer

package com.kdx.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class GoodServer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${mq.config.exchange}")
    private String exchange;

    public void send(String msg){
        amqpTemplate.convertAndSend(exchange,"good.log.debug","good.log.debug:" + msg);
        amqpTemplate.convertAndSend(exchange,"good.log.info","good.log.info:" + msg);
        amqpTemplate.convertAndSend(exchange,"good.log.warn","good.log.warn:" + msg);
        amqpTemplate.convertAndSend(exchange,"good.log.error","good.log.error:" + msg);
    }
}

6.3 测试

package com.kdx;

import com.kdx.provider.GoodServer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MqDemo04ProviderApplicationTests {

    @Autowired
    private GoodServer goodServer;

    @Test
    void test1() {
        goodServer.send("hello mq");
    }

}

启动消费者和生产者服务,执行test1():

在这里插入图片描述

结果看到使用通配符模式匹配 Routing Key,并将消息发送到与模式匹配的队列。

Topic交换器与队列绑定时的bindingKey可以指定通配符,而且Topic交换器接收的消息RoutingKey必须是多个单词,以.分割

7 Fanout Exchange案例

1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定

2.在consumer服务中,编写两个消费者方法,分别监听order.sms和order.push

3.在publisher中编写测试方法,向log.fanout发送消息

在这里插入图片描述

7.1 消费者

7.1.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

7.1.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8086

spring.application.name=mq-demo07-consumer

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器
mq.config.exchange=log.fanout

#设置队列Q1
mq.config.queue.sms=order.sms

#设置队列Q2
mq.config.queue.push=order.push

7.1.3 SmsReceiver

package com.kdx.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "false"),
        exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
))
public class SmsReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("sms:" + msg);
    }
}

7.1.4 PushReceiver

package com.kdx.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.push}",autoDelete = "false"),
        exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
))
public class PushReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("push:" + msg);
    }
}

7.2 生产者

7.2.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

7.2.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8085

spring.application.name=mq-demo06-provider

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器
mq.config.exchange=log.fanout

7.2.3 Sender

package com.kdx.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${mq.config.exchange}")
    private String exchange;

    public void send(String msg){
        amqpTemplate.convertAndSend(exchange,"",msg);   //routingKey不写
    }
}

7.3 测试

package com.kdx;

import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MqDemo06ProviderApplicationTests {

    @Autowired
    private Sender sender;

    @Test
    void testSend() {
        sender.send("fanout广播");
    }
}

启动消费者和生产者服务,测试testSend():

在这里插入图片描述

结果看到SmsReceiver和PushReceiver都接收到了交换器广播消息。

8 RabbitMQ 持久化

RabbitMQ 提供了消息的持久化机制,确保即使在 RabbitMQ 服务器重启后,消息仍然能够被恢复。这主要涉及到队列和消息的持久化。

在 5 Direct Echange的案例基础上修改测试方法

package com.kdx;

import com.kdx.provider.Sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MqDemo03ProviderApplicationTests {

    @Autowired
    private Sender sender;

    @Test
    void testSend1() {
        for (int i = 1; i < 1000; i++) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sender.send1("hello mq 1 ..." + i);
        }
    }
}

必须确保在autoDelete = "false"
出现在Queue中:当所有的消费者客户连接断开后,是否自动删除队列。
出现在Exchange中:当所有的绑定队列都不再使用时,是否自动删除交换器。

启动消费者和生产者服务,执行testSend1()方法:

在这里插入图片描述

现在停止消费者服务,结束在infoReceiver:hello mq 1 ...7

然后再重启消费者服务,控制台:

在这里插入图片描述

发现没有重新从infoReceiver:hello mq 1 ...1开始输出,而是接着7从infoReceiver:hello mq 1 ...8开始,这就是消息的持久化。

9 RabbitMQ ACK确认机制

RabbitMQ 的 Acknowledgment(简称 ack)机制是确保消息在消费者正确处理后才被确认的一种机制。它有助于提高消息传递的可靠性。在 RabbitMQ 中,有三种 Acknowledgment 模式:自动确认、手动确认(单条消息)、手动批量确认(foreach遍历)。

1. 自动确认模式(Automatic Acknowledgment)

在自动确认模式下,消息一旦被消费者接收,RabbitMQ 就会立即确认消息的接收。这种模式下,消费者无法明确知道消息是否被正确处理。

// 默认是自动确认模式
@RabbitListener(queues = "myQueue")
public void handleMessage(String message) {
    // 处理消息的业务逻辑...
}

2. 手动确认模式(Manual Acknowledgment)

在手动确认模式下,消费者需要显式地告诉 RabbitMQ 是否成功处理了消息。如果消费者成功处理消息,则调用 channel.basicAck 进行确认;如果处理失败,则可以调用 channel.basicNackchannel.basicReject 进行拒绝。

@RabbitListener(queues = "myQueue")
public void handleMessage(Message message, Channel channel) throws IOException {
    try {
        // 处理消息的业务逻辑...

        // 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理异常,可以选择拒绝消息或者进行其他处理
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
}

在手动确认模式下,消费者需要谨慎处理异常情况,以确保消息在处理失败时能够得到适当的处理。手动确认模式提供了更精细的控制,确保消息在被消费者正确处理后才被确认。

文章来源:https://blog.csdn.net/k010908/article/details/134753453
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。