目录
- 消息可靠性问题:如何确保发送的消息至少被消费一次
- 延迟消息问题:如何实现的延迟投递
- 消息堆积问题:解决数百万的消息堆积无法及时消费的问题
- 高可用问题:如何避免单点的MQ故障而导致的不可用问题
消息从生产者发送到exchange,再到queue,再到消费者,这个过程中有可能会导致消息丢失:
- 发送时丢失:生产者发送的消息未送达exchange,消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
宕机指的是计算机系统或设备因为各种原因(如硬件故障、软件错误、网络问题等)而无法正常运行或停止工作的状态。
RabbitMQ提供了publisher confirm机制来避免消息在发送到MQ过程中丢失,消息发送到MQ后会返回一个结果给发送者来表示消息是否处理成功,结果有如下的两种请求:
- publisher-confirm:发送者确认,消息成功投递到交换机返回ack,消息未投递到交换机,返回nack
- publisher-return:返回者回执,消息投递到交换机,但是没有路由到队列就会返回ACK以及路由失败的原因
注意:在确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突
MQ默认是内存存储消息,开启持久化就可以确保MQ的消息不会丢失。
SpringAMQP中的消息默认是持久的,也可以通过以下方式实现消息持久化:
交换机持久化:
@Bean public DirectExchange simpleDirect(){ // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct",true,false); }
队列持久化:
@Bean public Queue simpleQueue(){ // 使用QueueBuilder构建队列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }
消息持久化:SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定
@Test public void testDurableMessage() { // 1.准备消息 Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 2.发送消息 rabbitTemplate.convertAndSend("simple.queue", message); }
RabbitMQ支持消费者确认机制的,消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除消息,在SpringAMQP中允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
需要在配置文件中进行修改:
spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none # none,关ack; manual,手动ack; auto: 自动ack
当消费者出现异常后,消息会不断重新入队列,再重新发送给消费者,然后再次异常,再次重新入队,会出现无限循环导致mq的消息处理飙升,带来不必要的压力。
那么我们就可以使用Spring的retry机制,在消费者出现异常时先利用本地重试,而不是无限制的重新入队列。
在配置文件中进行如下修改:
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试机制 initial-interval: 1000 # 初始的失败等待时长是1s multiplier: 3 ## 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless:true # true无状态;false有状态。如果业务中包合事务,这里改为false
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)
RepublishMessageRecoverer的具体代码实现:
定义交换机和队列,并进行绑定:
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue"); } @Bean public Binding errorMessageBinding(){ return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error"); }
定义RepublishMessageRecoverer:
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
综上所述, 可以从如下方面实现RabbitMQ消息的可靠性:
- 开启生产者确认机制,确保生产者的消息能够到达队列
- 开启持久化功能,确保队列中的消息在未消费前不会丢失
- 开启消费者确认机制为auto,又spring确认消息处理成功之后返回ack
- 开启消费者失败重试机制,设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,由人工进行处理
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
TTL,也就是Time-To-Live(存活时间)。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
- 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
当队列和消息本身都设置了存活时间时,以时间短的ttl为准
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
RabbitMQ的官方也推出了rabbitmq_delayed_message_exchange插件,原生支持延迟队列效果?,RabbitMQ的官方插件社区进行下载安装到云服务器上。
在Java代码中就可以使用SpringAMQP的延迟队列插件DelayExchange,其本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。然后我们向这个delay为true的交换机中发送消息,一定要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积有三种种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
惰性队列具备如下的特征:
- 接收消息后直接存入的是磁盘而不是内存
- 消费者消费消息是从磁盘中进行读取再加载到内存中的
- 支持数百万条的消息存储
设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
用SpringAMQP声明惰性队列分两种方式:
基于Bean的方式:
@Bean public Queue lazyQueue() { return QueueBuilder.durable("lazy.queue") .lazy() # 开启x—queue-mode为lazy .build(); }
基于注解的方式:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayExchange(String msg) { log.info("消费者接收到了delay.queue的延迟消息"); }