RocketMQ实践:确保消息不丢失与顺序性的高效策略

发布时间:2023年12月23日

一、使用RocketMQ如何保证消息不丢失?

这个是在面试时,关于MQ,面试官最喜欢问的问题。这个问题是所有MQ都需要面对的一个共性问

题。大致的解决思路都是一致的,但是针对不同的MQ产品又有不同的解决方案。分析这个问题要从以?下几个角度入手:

1、哪些环节会有丢消息的可能?

我们考虑一个通用的MQ场景:

其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。

然后关于3这个环节,通常MQ 存盘时都会先写入操作系统的缓存page?cache中,然后再由操作系统异?步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来?得及写入硬盘的消息就会丢失。

这个是MQ场景都会面对的通用的丢消息问题。那我们看看用Rocket?时要如何解决这个问题

2、RocketMQ??消息零丢失方案

1》生产者使用事务消息机制保证消息零丢失

这个结论比较容易理解,因为RocketMQ?的事务消息机制就是为了保证零丢失来设计的,并且经过阿里?的验证,肯定是非常靠谱的。

但是如果深入一点的话,我们还是要理解下这个事务消息到底是不是靠谱。我们以最常见的电商订单场?景为例,来简单分析下事务消息机制如何保证消息不丢失。我们看下下面这个流程图:

1、???????half??????????

这个half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。那这个消息的?作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ服务是否正常,并且通知 RocketMQ,我马上就要发一个很重要的消息了,你做好准备。

2.half消息如果写入失败了怎么办?

如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ。这时候写 ??入消息到MQ如果失败就会非常尴尬了。而half消息如果写入失败,我们就可以认为MQ的服务是有问题?的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后 ?再进行补偿操作,等MQ服务正常后重新下单通知下游服务。

3.订单系统写数据库失败了怎么办?

这个问题我们同样比较下没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断下?单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这??样的话,如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。当然,也可以设计另外的补?偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。而如果使用事务消息机制,??就可以有一种更优雅的方案。

如果下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方 ??把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样?RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据 ?库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息??就不会因为数据库临时崩了而丢失。

4.half消息写入成功后RocketMQ挂了怎么办?

我们需要注意下,在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQBroker主动发?起的。也就是说如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务。这?时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息没有丢

失,?RocketMQ就会再次继续状态回查的流程。

5.下单成功后如何优雅的等待支付成功?

在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟,内完成订单支付,支付完成

后才会通知下游服务进行进一步的营销补偿。

如果不用事务消息,那通常会怎么办?

最简单的方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过?时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个?不小的压力。

那更进一步的方案是什么呢?是不是就可以使用RocketMQ供的延迟消息机制。往MQ发一个延迟1?钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。??而如果没有支付,就再发一个延迟1分钟的消息。最终在第十个消息时把订单回收。这个方案就不用对 ?全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。

那如果使用上了事务消息呢?我们就可以用事务消息的状态回查机制来替代定时的任务。在下单时,给?Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业???务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15)和事务回查间隔时 ?间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

6、事务消息机制的作用

整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业?务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事??务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事?务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。

目前来看,也是业内最好的降级方案。

2》RocketMQ ?配置同步刷盘+Dledger?主从架构保证MQ?自身不会丢消息

1、同步刷盘

这个从我们之前的分析,就很好理解了。我们可以简单的把RocketMQ?的刷盘方式flushDiskType配置?成同步刷盘就可以保证消息在刷盘过程中不会丢失了。

2Dledger的文件同步

在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成?功同步。

简单来说,数据同步会通过两个阶段,?一个是uncommitted 阶段,?一个是commited?阶段。

Leader Broker上的Dledger?收到一条数据后,会标记为uncommitted ?状态,然后他通过自己的?DledgerServer组件把这个uncommitted 数据发给Follower BrokerDledgerServer组件。

接着Follower BrokerDledgerServer收到uncommitted??消息之后,必须返回一个ack给 ?Leader BrokerDledger然后如果Leader?Broker收到超过半数的Follower Broker返回的ack?之后,就会把消息标记为committed 状态。

再接下来, Leader ?Broker上的DledgerServer就会发送committed ?消息给Follower ?Broker ???上的DledgerServer, ?让他们把消息也标记为committed?状态。这样,就基于Raft?协议完成了两阶?段的数据同步。

3》消费者端不要使用异步消费机制

正常情况下,消费者端都是需要先处理本地事务,然后再给MQ?一个ACK?响应,这时MQ?就会修改

Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制?下,消息是不会在传输过程中丢失的。但是也会有下面这种情况会造成服务端消息丢失:

       DefaultMQPushconsumer consumer =new
DefaultMQPushConsumer("please_rename_unique_group_name_4");
                  consumer.registerMessageListener(new            MessageListenerConcurrently)
{
                        @override
                        public       Consumeconcurrentlystatus
consumeMessage(List<MessageExt>msgs,
ConsumeConcurrentlyContext context) {
                   new Thread(){
                       public void run(){
                           //处理业务逻辑
                          System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
                       }
                    };
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });

这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可?能。

4RocketMQ特有的问题,??NameServer挂了如何保证消息不丢 失?

NameServerRocketMQ中,是扮演的一个路由中心的角色,提供到Broker的路由功能。但是其实路 由中心这样的功能,在所有的MQ中都是需要的。kafka是用zookeeper和一个作为ControllerBroker?一起来提供路由服务,整个功能是相当复杂纠结的。而RabbitMQ是由每一个Broker来提供路由服务。??而只有RocketMQ把这个路由中心单独抽取了出来,并独立部署。

这个NameServer之前都了解过,集群中任意多的节点挂掉,都不会影响他提供的路由功能。那如果集?群中所有的NameServer节点都挂了呢?

有很多人就会认为在生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作一段?时间。其实这个问题大家可以做一下实验,当NameServer部挂了后,生产者和消费者是立即就无法??工作了的。至于为什么,可以回顾一下我们之前的源码课程去源码中找找答案。

那再回到我们的消息不丢失的问题,在这种情况下,??RocketMQ相当于整个服务都不可用了,那他本身?肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系?统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消 息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等

RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型?互联网项目中,都是必须要有的。

5?RocketMQ消息零丢失方案总结

完整分析过后,整个RocketMQ消息零丢失的方案其实挺简单

  • 生产者使用事务消息机制。
  • Broker配置同步刷盘+Dledger主从架构
  • 消费者不要使用异步消费。
  • 整个MQ挂了之后准备降级方案

那这套方案是不是就很完美呢?其实很明显,这整套的消息零丢失方案,在各个环节都大量的降低了系?统的处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失的代价可能远远大于部分消息丢失?的代价。所以,我们在设计RocketMQ使用方案时,要根据实际的业务情况来考虑。例如,如果针对所??有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。而在有些对消息可 靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,而采用定时对?账、补偿的机制来提高消息的可靠性。而如果消费者不需要进行消息存盘,那使用异步消费的机制带来?的性能提升也是非常显著的。

总之,这套消息零丢失方案的总结是为了在设计RocketMQ使用方案时的一个很好的参考。

二、使用RocketMQ如何保证消息顺

1、为什么要保证消息有序???


这个也是面试时最常见的问题,需要对MQ场景有一定的深入理解。例如如果我们有个大数据系统,需 要对业务系统的日志进行收集分析,这时候为了减少对业务系统的影响,通常都会通过MQ来做消息中 转。而这时候,对消息的顺序就有一定的要求了。例如我们考虑下面这一系列的操作。
1. 用户的积分默认是0分,而新注册用户设置为默认的10分。
2. 用户有奖励行为,积分+2分。
3. 用户有不正当行为,积分-3分。
这样一组操作,正常用户积分要变成9分。但是如果顺序乱了,这个结果就全部对不了。这时,就需要 对这一组操作,保证消息都是有序的。


2、如何保证消息有序?

1. 消息队列顺序类型
  • 全局有序:在整个消息队列(MQ)系统中,所有消息都严格遵循队列的先入先出(FIFO)顺序进行消费。
  • 局部有序:只针对部分关键消息保证消费顺序。
2. 业务场景分析
  • 在大多数MQ业务场景中,局部有序通常是足够的。例如,在聊天应用中,只需保证每个聊天窗口内的消息有序;同样,在电商订单处理中,只需保证单个订单的消息顺序。
  • 全局有序的需求较少,且在很多情况下,可以通过局部有序的方法来解决。例如,传统的聊天室需要全局有序,但实际上可以通过单个聊天窗口(局部有序)的方式来实现。
3. RocketMQ的应用
  • 发送消息时的分配:发送者通常通过轮询的方式将消息均匀分布到多个MessageQueue,而消费者从这些MessageQueue中消费消息。由于MessageQueue是RocketMQ存储消息的最小单元,且相互隔离,因此无法保证全局有序。
  • 实现局部有序:为了保证局部有序,可以将一组有序的消息存储在同一个MessageQueue中。RocketMQ允许发送者在发送消息时使用MessageSelector对象来决定消息存储在哪个MessageQueue,从而实现局部有序。
  • 实现全局有序的策略:一种常见的全局有序实现方式是将一个Topic配置为只有一个MessageQueue(默认为四个)。这种方法类似于将聊天室场景简化为单个聊天窗口。然而,这种方法会大幅影响整个Topic的消息吞吐能力,可能导致使用MQ变得不必要。
4. 总结
  • 局部有序通常满足大多数业务需求,而全局有序更为复杂,对系统性能有较大影响。
  • RocketMQ通过灵活的MessageQueue管理,提供了高效的局部有序解决方案,同时也支持全局有序,但后者可能会对性能造成负面影响。


三、使用RocketMQ如何快速处理积压消息?

1、如何确定RocketMQ有大量的消息积压??? ?

1. 消息生产与消费平衡
  • 在正常情况下,消息队列(MQ)的目标是保持消息的生产速度和消费速度整体平衡。
2. 消息积压的原因
  • 系统故障:例如数据库故障或网络波动可能导致部分消费者系统出现故障,从而引发消息积累。
  • 隐蔽性:这类问题在实际工作中可能不易察觉,直至积压严重。
3. 消息积压的影响
  • 在大型互联网项目中,消息积压的速度可能非常快。
  • 对MQ系统的影响
    • RocketMQ和Kafka:消息积压对系统性能的影响较小。
    • RabbitMQ:大量消息积压可能导致性能急剧下降。
4. 监测消息积压
  • RocketMQ监测方法
    • Web控制台:可以直接查看消息积压情况。在主题页面通过Consumer管理按钮,可以实时监控消息积压。
    • mqadmin指令:用于在后台检查各个Topic的消息延迟情况。
    • 配置文件检查:RocketMQ会在${storePathRootDir}/config目录下生成一系列json文件,这些文件也可用于追踪消息积压状况。
5. 总结
  • 消息队列的高效管理需要时刻关注消息的生产和消费平衡,特别是在面对系统故障时。
  • 不同的MQ系统对消息积压的敏感度不同,RocketMQ提供了多种方式来有效监控和管理消息积压,以确保系统性能不受严重影响。

2、如何处理大量积压的消息?

1. 基于RocketMQ负载均衡的解决方案
  • MessageQueue配置:当一个Topic下的MessageQueue数量足够多时,每个Consumer会分配到多个MessageQueue进行消费。
  • 增加Consumer节点:通过增加Consumer的服务节点数量可以加快消息的消费速度。在极限情况下,Consumer节点的数量可以与MessageQueue的数量相同。
2. 处理不足的MessageQueue配置
  • 当Topic下的MessageQueue数量不足时,增加Consumer节点的方法不再有效。
  • 创建新的Topic:可以创建一个新的Topic,配置足够多的MessageQueue。
  • 转移和消费旧消息:将所有消费者节点的目标Topic转向新的Topic,并上线一组新的消费者专门负责消费旧Topic中的消息,并转储到新的Topic中。然后在新的Topic上通过增加消费者数量来提高消费速度。
3. 特殊情况处理:主从架构切换
  • 普通架构与Dledger高可用集群切换
    • 当从普通主从架构切换到Dledger集群时,需要注意历史消息的处理。
    • 消费者消息对齐:在切换主从架构之前,消费者需要消费掉所有消息,以防历史消息丢失。
    • CommitLog日志接管:Dledger集群会接管RocketMQ原有的CommitLog日志。如果在切换时仍有未消费的消息,这些消息将无法继续消费,因为它们存储在旧的CommitLog中。
4. 总结
  • RocketMQ提供了灵活的负载均衡和消息处理机制,以应对不同的消息积压情况。
  • 通过调整Consumer节点数量和MessageQueue配置,可以有效地处理消息积压。
  • 在特殊情况下,如架构切换,需特别注意消息的完整消费以避免数据丢失。

文章来源:https://blog.csdn.net/oWuChenHua/article/details/135175441
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。