RabbitMQ学习笔记

发布时间:2024年01月14日

介绍

名词解释

Broker:接受和分发消息的应用,例如RabbitMQ Server

Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等

Connection:生产者/消费者与broker之间的TCP连接

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销

Exchange:交换机,按一定的规则将消息路由转发到某个队列。

Queue:消息队列

image-20230404204732046

安装

参考:https://www.rabbitmq.com/install-debian.html#apt-cloudsmith

最后安装server时不要带-y --fix-missing

用户权限配置参考:https://blog.csdn.net/theRengar/article/details/118933418

RabbitMQ命令参考:https://computingforgeeks.com/how-to-install-latest-rabbitmq-server-on-ubuntu-linux

工作模式

simple

一个消息的发布者和一个消费者

(P) -> [|||] -> (C)

<!--        AMQP依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
<!--        测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

可以将基本的RabbitMQ配置打包模块使用

字段配置类

@Configuration
public class RabbitMQConfig {
    public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq_demo_topic";
}

统一MQ配置

spring:
  rabbitmq:
    host: 
    port: 
    username: 
    password: 

新模块使用前面的模块做依赖,统一配置

RabbitMQ对象配置

@Configuration
public class RabbitMQConfiguration {
    /**
     * rabbitmq演示直接队列
     * @return {@code Queue}
     * 1、name:    队列名称
     * 2、durable: 是否持久化
     * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
     * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
     *
     */
    @Bean
    public Queue rabbitmqDemoDirectQueue() {

        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, true);
    }
}

接收者

@Component
@RabbitListener(queues = RabbitMQConfig.RABBITMQ_DEMO_TOPIC)
public class Receiver {

    @RabbitHandler
    public void receive(String msg) {
        System.out.println("接收到消息:" + msg);
    }
}

使用@RabbitListener注解指定某方法作为接收器;或加在类上,并在相应方法上添加@RabbitHandler进行标记,可以根据接受的参数类型进入具体的方法中。

参考:https://blog.csdn.net/sliver1836/article/details/119734239

eg:

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
 
    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }
 
    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
    
}

发送者

也可以直接用spring的aqmpTemplate接口,但是少了一些方法

@Component
public class Sender {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, msg);
    }
}

Work Queues

多个消费端共同消费同一个队列的消息,消费者之间是竞争关系

Producer -> Queue -> Consuming:Work Queue 用于将耗时任务分配给多个worker。

对绑定一个队列的消费者创建多个对象即可

Publish/Subscribe

消息发布者发布到交换机,交换机根据相应规则转发到对应队列

交换器:生产者只能向交换器发送消息。 交换的一侧接收来自生产者的消息,另一侧将它们推送到队列。

交换机类型:direct, topic, headers,fanout

  1. Direct Exchange:直连交换机,根据Routing Key进行投递到不同队列。

    • 单个绑定,一个路由键对应一个队列。

      img

    • 多个绑定,一个路由键对应多个队列,则消息会分别投递到两个队列中

      img

  2. Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。此模式下Routing Key会被忽略

    img

  3. Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示零个或多个词,*表示一个词。

    eg:“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”

    img

  4. Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

    匹配规则x-match有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息

    x-match = any :表示只要有键值对匹配就能接受到消息

    img

声明交换机和队列的Bean,之后进行绑定。

@Configuration
public class RabbitMQConfiguration {
    /**
     * rabbitmq演示直接队列
     * @return {@code Queue}
     * 1、name:    队列名称
     * 2、durable: 是否持久化
     * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
     * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
     *
     */
    @Bean
    public Queue rabbitmqDemoQueue() {

        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
    }

    @Bean
    public Queue rabbitmqDemoQueue1() {

        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC+"1", true, false, false);
    }

    @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, true);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange() {
        //Fanout交换机
        return new FanoutExchange(RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE, true, true);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoQueue())
                //到交换机
                .to(rabbitmqDemoFanoutExchange());
                //并设置匹配键
//                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }

    @Bean
    public Binding bindDirect1() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoQueue1())
                //到交换机
                .to(rabbitmqDemoFanoutExchange());
        //并设置匹配键
//                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }

    @Bean
    public Receiver receiver1() {
        return new Receiver(1);
    }

    @Bean
    public Receiver receiver2() {
        return new Receiver(2);
    }

    @Bean
    public Receiver receiver3() {
        return new Receiver(3);
    }

    @Bean
    public Receiver receiver4() {
        return new Receiver(4);
    }
}

统一配置中也可添加部分键值等

@Configuration
public class RabbitMQConfig {
    public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq_demo_topic";

    public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE = "rabbitmq_demo_direct_exchange";

    public static final String RABBITMQ_DEMO_DIRECT_ROUTING = "rabbitmq_demo_direct_routing";

    public static final String RABBITMQ_DEMO_FANOUT_EXCHANGE = "rabbitmq_demo_fanout_exchange";
}

Routing

使用direct交换机,用RoutingKey进行绑定配置

Topics

使用topic类型交换机,在RoutingKey中使用特殊字符

RPC

Publisher Confirms

配置详解

基础信息

spring.rabbitmq.host: 默认localhost
spring.rabbitmq.port: 默认5672
spring.rabbitmq.username: 用户名
spring.rabbitmq.password: 密码
spring.rabbitmq.virtual-host: 连接到代理时用的虚拟主机
spring.rabbitmq.addresses: 连接到server的地址列表(以逗号分隔),先addresses后host 
spring.rabbitmq.requested-heartbeat: 请求心跳超时时间,0为不指定,如果不指定时间单位默认为妙
spring.rabbitmq.publisher-confirms: 是否启用【发布确认】,默认false
spring.rabbitmq.publisher-returns: 是否启用【发布返回】,默认false
spring.rabbitmq.connection-timeout: 连接超时时间,单位毫秒,0表示永不超时 

SSL

spring.rabbitmq.ssl.enabled: 是否支持ssl,默认false
spring.rabbitmq.ssl.key-store: 持有SSL certificate的key store的路径
spring.rabbitmq.ssl.key-store-password: 访问key store的密码
spring.rabbitmq.ssl.trust-store: 持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 访问trust store的密码
spring.rabbitmq.ssl.trust-store-type=JKS:Trust store 类型.
spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置
spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证
spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证

Cache

spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
spring.rabbitmq.cache.connection.size: 缓存的channel数,只有是CONNECTION模式时生效
spring.rabbitmq.cache.connection.mode=channel: 连接工厂缓存模式:channel 和 connection

Listener

simple为前两种工作模式

direct为后四种

spring.rabbitmq.listener.type=simple: 容器类型.simple或direct
 
spring.rabbitmq.listener.simple.auto-startup=true: 是否启动时自动启动容器
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
spring.rabbitmq.listener.simple.prefetch: 一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量.
spring.rabbitmq.listener.simple.transaction-size: 当ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch, 则prefetch将增加到这个值
spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器
spring.rabbitmq.listener.simple.idle-event-interval: 发布空闲容器的时间间隔,单位毫秒
spring.rabbitmq.listener.simple.retry.enabled=false: 监听重试是否可用
spring.rabbitmq.listener.simple.retry.max-attempts=3: 最大重试次数
spring.rabbitmq.listener.simple.retry.max-interval=10000ms: 最大重试时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms:第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.multiplier=1: 应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.stateless=true: 重试时是否无状态
 
spring.rabbitmq.listener.direct.acknowledge-mode= ack模式
spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器
spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量.
spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队.
spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔.
spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败.
spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量.
spring.rabbitmq.listener.direct.retry.enabled=false  是否启用发布重试机制.
spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # 第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.direct.retry.max-attempts=3 # 发送消息的最大尝试次数
spring.rabbitmq.listener.direct.retry.max-interval=10000ms # 最大重试时间间隔
spring.rabbitmq.listener.direct.retry.multiplier=1 # 应用于上一重试间隔的乘数
spring.rabbitmq.listener.direct.retry.stateless=true # 重试是否无状态

Template

spring.rabbitmq.template.mandatory: 启用强制信息;默认false
spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
spring.rabbitmq.template.retry.enabled=false: 发送重试是否可用 
spring.rabbitmq.template.retry.max-attempts=3: 最大重试次数
spring.rabbitmq.template.retry.initial-interva=1000msl: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.template.retry.multiplier=1: 应用于上一重试间隔的乘数
spring.rabbitmq.template.retry.max-interval=10000: 最大重试时间间隔

消息转换器

RabbitMQ默认使用SimpleMessageConverter,基于]DK的ObjectOutputStream序列化转换消息,有速度和安全性的缺陷。

Json序列化

建议在common包做统一配置

导包

		<dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>

注入bean

	@Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

注解式用法

不需要配置队列,交换机和绑定的Bean,改在消费者的@RabbitListener上进行配置。如果交换机和队列已存在,并和此处声明的配置不同会报错。

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(name = RabbitMQConfig.RABBITMQ_DEMO_TOPIC, durable = "true",autoDelete = "false",exclusive = "false"),
                exchange = @Exchange(name = RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT),
                key = "s"),
        @QueueBinding(
                value = @Queue(name = RabbitMQConfig.RABBITMQ_DEMO_TOPIC + "1", durable = "true",autoDelete = "false",exclusive = "false"),
                exchange = @Exchange(name = RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT),
                key = "s")

})

由于未注入Bean,发送方此时需要显式使用交换机名/队列名等进行发送。

@Component
public class Sender {

    @Resource
    private RabbitTemplate amqpTemplate;


    public void send(String msg) {
        amqpTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE,"", msg);
    }

}

QueueBuilder继承了AbstractBuilder,包括一个封装的map,存储了@Queue构建时的参数。包括ttl,死信交换机的配置等。在做绑定时可以用参数arguments和注解@Arguments做参数配置。具体可用参数可从QueueBuilder中查看。设定ttl要注意指定参数类型,不然会报错。

同样的,@QueueBinding也可以使用参数arguments和注解@Arguments做参数配置,但似乎没有可用参数。

同样的,@Exchange也可以使用参数arguments和注解@Arguments做参数配置。可用参数只有alternate-exchange,如果发送消息的时候根据routingkey并没有把消息路由到队列中去,这就会将此消息路由到Alternate Exchange属性指定的Exchange上。参考文章

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(name = RabbitMQConfig.RABBITMQ_DEMO_TOPIC, autoDelete = "true",exclusive = "false"
                        ,arguments = {@Argument(name = "x-message-ttl", value = "10000",type = "java.lang.Integer"),
                                @Argument(name = "x-dead-letter-exchange", value = "dead.exchange"),
                                @Argument(name = "x-dead-letter-routing-key", value = "dead")
                                }),
                exchange = @Exchange(name = RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, type = ExchangeTypes.DIRECT,autoDelete = "true"
                        ,arguments = {@Argument(name = "alternate-exchange", value = "alternate.exchange")}),
                key = "s")
})

常见场景

消息可靠性

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机,返回ack

    • 消息未投递到交换机,返回nack

  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  #验证消息有没有顺利到达MQ simple:同步等待confirm结果直到超时 correlated:异步回调,定义回调类,返回结果时会回调这个类
    publisher-returns: true #验证消息有没有正确路由到相应的队列的功能
    template:
      mandatory: true  # 定义消息路由失败时的策略 true:调用ReturnCallBack false:丢弃消息

配置confirm

  • 给单条发送的消息配置
public void send(String msg) {
        //设置消息唯一id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //设置消息投递到交换机的回调
        correlationData.getFuture().addCallback(result -> {
            //success callback
            if (result != null && result.isAck()) {
                log.debug("消息投递到交换机成功:{}" , correlationData.getId());
            }else{
                log.error("消息投递到交换机失败:{}" , correlationData.getId());
            }
            //failure callback
        }, ex -> {
            log.error("消息发送失败" + ex.getMessage());
        });
        amqpTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,"es", msg,correlationData);
    }
  • 全局配置

    在bean配置中给RabbitTemplate做配置

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack){
                    log.debug("消息投递到交换机成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }else {
                    log.error("消息投递到交换机失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }
            });
        }
    }
    
    

全局bean配置ReturnCallsBack 发送者回执

//setReturnCallback()从springboot2.3已弃用,将多个参数封装到ReturnedMessage中
        rabbitTemplate.setReturnsCallback(returned -> {
            // 日志
            log.error("消息发送失败,消息内容为:{},错误原因为:{},错误编码为:{},交换机为:{},路由键为:{}",
                    returned.getMessage().toString(),
                    returned.getReplyText(),
                    returned.getReplyCode(),
                    returned.getExchange(),
                    returned.getRoutingKey());
        });

消息持久化

默认交换机,队列,消息都是持久化

消费者消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。

而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每个消费者未确认的消息的最大数量。默认值为3。建议设置为1,以便在消费者处理消息时不会将其分配给其他消费者。
        acknowledge-mode: auto #none:关闭ack; manual:手动ack; auto:自动ack

消费失败重试

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 是否开启重试
          initial-interval: 1000 #第一次重试间隔时间
          multiplier: 3 # 重试间隔时间递增倍数
          max-attempts: 3 # 最大重试次数
          stateless: true # 是否是无状态的重试 如果业务包含事务则改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

    image-20230604113224266

eg:

@Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }

延迟消息

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信:

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机。

image-20230604135801989

延迟消息

给消息或队列设置ttl,通过过期转给(死信)交换机,实现延迟消息。若两方法共存则取短

Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setExpiration("10000")
                .build();

也可以给MQ安装官方DelayExchange插件实现

https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

消息堆积

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃。

解决方案:

  • 队列上绑定多个消费者,提高消费速度
  • 给消费者开启线程池,提高消费速度
  • 使用惰性队列,可以保存更多消息

惰性队列

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

注解声明:

在声明队列时加

@Argument(name = "x-queue-mode" ,value = "lazy")

Bean声明:

用QueueBuilder构造队列,添加QueueBuilder.lazy()

高可用

搭设集群

文章来源:https://blog.csdn.net/Loli_Wolf/article/details/135571876
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。