消息中间件

发布时间:2024年01月09日

一、 消息中间件的介绍

消息中间件属于分布式系统中的子系统,关注数据的发送和接收,利用高效、可靠的异步消息传递机制,对分布式系统中的各个子系统进行集成。

1.1 为什么使用消息中间件

异步
解耦
缓冲能力
伸缩性
提高系统扩展性

1.2 消息中间件的使用场景

异步处理
注册后发送短信等。

应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
1)假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;
2)订单系统与库存系统耦合;

引入消息队列,在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

日志处理
将消息队列用在日志处理中,比如 Kafka 的应用,解决大量日志传输的问题。

消息通讯
消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。例如实现点对点消息通讯或者聊天室等。

流量削峰
秒杀、团购等活动。

1.3 消息队列的发展史

在这里插入图片描述

1.4 消息队列的选择

用户访问量在ActiveMQ 的可承受范围内,而且确实主要是基于解耦和异步来用的,可以考虑ActiveMQ,也比较贴近 Java 工程师的使用习惯,但是ActiveMQ 现在停止维护了,同时ActiveMQ并发不高,所以业务量一定的情况下可以考虑使用。

RabbitMQ 作为一个纯正血统的消息中间件,有着高级消息协议 AMQP的完美结合,在消息中间件中地位无可取代,但是 erlang 语言阻止了我们去深入研究和掌控,对公司而言,底层技术无法控制,但是确实是开源的,有比较稳定的支持,活跃度也高。
对自己公司技术实力有绝对自信的,可以用 RocketMQ,但是 RocketMQ 诞生比较晚,并且更新迭代很快,这个意味着在使用过程中有可能会遇到很多坑,所以如果公司 Java 技术不是很强,不推荐使用。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka是业内的标准。

二、 RabbitMQ

2.1 AMQP

是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品、不同的开发语言等条件的限制。目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务,主要实现有RabbitMQ。

三、 Kafka

学习 Kafka 入门知识看这一篇就够了!

四、RocketMQ

4.1 介绍

阿里巴巴开发的高可用的分布式集群技术、正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双11使用的核心产品。2017年提交到Apache基金会成为Apache基金会的顶级开源项目。
RocketMQ的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费。

RocketMQ的特点:

  1. NameServer设计及其简单。
    RocketMQ抛弃了业界常用的Zookeeper充当消息管理的“注册中心”,而是使用自主研发的NameServer来实现各种元数据的管理(Topic 路由信息等)。
  2. 高效的I/O存储
    RocketMQ追求消息发送的高吞吐量,RocketMQ 的消息存储设计成文件组的概念,组内单个文件固定大小,引入了内存映射机制,所有主题的消息存储基于顺序读写,极大提高消息写性能,同时为了兼顾消息消费与消息查找,引入消息消费队列文件与索引文件
  3. 容忍存在设计缺陷
    适当将某些工作下放给 RocketMQ 的使用者,比如消息只消费一次,这样极大的简化了消息中间件的内核,使得RocketMQ的实现发送变得非常简单与高效。

核心概念

  1. NameServer
    RocketMQ 的服务注册中心(相当于大脑)。 Broker在启动时向NameServer注册(主要是服务器地址等),生产者在发送消息之前先从 NameServer获取Broker服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。NameServer与每台Broker服务保持长连接,并间隔30S检查Broke是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ 的高可用。
  2. 主题(Topic)
    一级消息分类。
  3. 生产者
    消息发送方,负责生产消息,并将消息发送到Topic。
  4. 消费者
    消息接收方,负责从Topic中接收并消费消息。

整体运转流程
在这里插入图片描述

  1. NameServer先启动
  2. Broker启动时向NameServer注册
  3. 生产者在发送某个主题的消息之前先从NamerServer获取 Broker服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker 进行消息发送。
  4. NameServer与每台Broker服务器保持长连接,并间隔 30S 检测Broker是否存活,如果检测到Broker宕机(超过120S),则从服务器地址中将其移除。
  5. 消费者在订阅某个主题的消息之前从NamerServer获取 Broker服务器地址列表(有可能是集群),消费者选择从Broker中订阅消息,订阅规则由Broker配置决定。

4.2 设计理念和目标

设计理念

  1. 整体设计思想借鉴Kafka。
  2. 整体设计追求简单与性能。
    基于主题的发布和订阅,核心功能包括消息发送、消息存储和消息消费。
  3. NameServer性能对比Zookeeper有极大的提升。
  4. 高效的IO存储机制。
    基于文件顺序读写,内存映射机制。
  5. 容忍设计缺陷。
    RocketMQ自身不保证消息只消费一次,从而简化RocketMQ的内核,使得RocketMQ简单与高效。需要消费者自己实现幂等。

设计目标

  1. 简单的架构模式
    发布订阅模式,主要组件:消息发送者、消息服务器(消息存储)、消息消费者。
  2. 支持顺序消息
    RocketMQ 可以严格保证消息有序。
  3. 支持消息过滤
    消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。
  4. 消息存储高性能
    引入内存映射机制,所有的主题消息顺序存储在同一个文件中。同时为了防止无限堆积,引入消息文件过期机制和文件存储空间报警机制。
  5. 消息高可用
    同步刷盘机制,不丢失消息,异步刷盘丢失少量消息, 如果引入双写机制,基本可以满足消息可靠性要求极高的场景。
  6. 消息消费低延迟
    在不发生消息堆积时,以长轮询模式实现实时的消息推送模式。
  7. 确保消息至少被消费一次
    消息确认机制(ACK)来确保消息至少被消费一次。
  8. 消息回溯
    已经消费完的消息,可以根据业务要求重新消费消息。
  9. 消息堆积能力强
    采用磁盘文件存储,堆积能力比较强,同时提供文件过期删除机制。
  10. 支持延时消费
    apache版本目前支持指定级别时间后消费,阿里云版本支持任意时间消费。
  11. 支持消息重试机制

4.3 消息的生产

4.3.1 消息发送方式

单向发送
发送方只负责发送消息,不等待服务器响应、没有回调函数。发送耗时短(微妙级)。
在这里插入图片描述

可靠同步发送
在这里插入图片描述

可靠异步发送
在这里插入图片描述

4.3.2 批量发送

在这里插入图片描述

4.3.3 消息发送流程

消息发送的主要流程

  1. 验证消息
    主要是要求主题名称、消息体不能为空、消息长度不能等于 0,且不能超过消息的最大的长度 4M(生产者对象中配置maxMessageSize=1024 * 1024 * 4)
  2. 查找路由
    客户端(生产者)会缓存topic路由信息(如果是第一次发送消息,本地没有缓存,查询 NameServer 尝试获取),路由信息主要包含了消息队列(queue)相关信息。
  3. 消息发送
    选择消息队列,发送消息,发送成功则返回。

4.3.4 消息重试机制

生产者在消息发送失败后,会重新尝试发送消息,默认重试2次。
可以通过参数进行配置:
在这里插入图片描述

失败重试规避:
RocketMQ 发现消息发送失败后,就会将失败的Broker排除在选择范围之外,下次发送消息时就不会发送到该Broker,这样做的目的就是为了提高发送消息的成功率。

4.4 消息的消费

4.4.1 消费模式

集群消费
使用相同Group ID的订阅者属于同一个集群。同一个集群下的订阅者订阅关系一致(topic、tag都一致)。

广播消费
当使用广播消费模式时,RocketMQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
广播消费模式下不支持顺序消息。
广播消费模式下不支持重置消费位点。
广播消费模式下客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过。

4.4.2 消费方式

推送
系统收到消息后自动调用处理函数来处理消息,自动保存Offset,并且加入新的消费者后会自动做负载均衡。
底层实现上,推送模式使用pull来实现。
在这里插入图片描述

拉取
DefaultMQPullConsumer
通过“长轮询”的方式拉取消息。
与推送方式比较,需要额外处理:

  1. 获取MessageQueues并遍历(一个Topic包括多个队列)。特殊情况下也可以选择指定的MessageQueue来读取消息
  2. 维护 Offsetstore,从一个MessageQueue里拉消息时,要传入 Offset 参数,随着不断的读取消息,Offset会不断增长。这个时候就需要用户把 Offset存储起来,根据实际的情况存入内存、写入磁盘或者数据库中。
  3. 根据不同的消息状态做不同的处理。

在这里插入图片描述

4.4.3 长轮询

RocketMQ 使用“长轮询”的方式获取消息。核心思想是,客户端还是拉取消息,Broker端HOLD住客户端发过来的请求一小段时间,在这个时间内(5s)有新消息达到,就利用现有的连接立刻返回消息给 Consunmer。
“长轮询”的主动权掌握在 Consumer 手中,Broker即使有大量消息积压,也不会主动推送给 Consumer。因为长轮询方式的有局限性,在HOLD住Comsumer请求的时候需要占用资源,所以它适合在消息队列这种客户端连接数可控的场景中。
https://segmentfault.com/a/1190000018411470

4.4.4 消息队列负载与重新分布机制

如果有 8 个消息队列(q1-q8),有 3 个消费者(c1,c2,c3)
RocketMQ 默认提供 5 中分配算法:

  1. 平均分配(AllocateMessageQueueAveragely)
    c1:q1,q2,q3
    c2:q4,q5,q6
    c3:q7,q8

  2. 平均轮询分配(AllocateMessageQueueAveragelyByCircle)
    c1:q1,q4,q7
    c2:q2,q5,q8
    c3:q3,q6

  3. 一致性Hash(AllocateMessageQueueConsistentHash)
    不推荐使用,因为消息队列负载均衡信息不容易跟踪

  4. 根据配置(AllocateMessageQueueByConfig)
    为每一个消费者配置固定的消费队列

  5. 根据Broker部署机房名(AllocateMessageQueueByMachineRoom)
    对每一个消费者负载不同 Broker上的队列

一般尽量使用“平均分配”“平均轮询分配”,因为分配算法比较直观。无论哪种算法,遵循的原则是一个消费者可以分配多个消息队列,同一个消息队列只会分配一个消费者,所以如果消费者个数量大于消息队列数量,则有些消费者无法消费消息。

4.4.5 消息确认

为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 消费成功。
ConsumeConcurrentlyStatus.RECONSUME_LATER 消费失败,会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup。

为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回Broker(topic 不是原 topic 而是这个消费者的 RETRY topic),在延迟的某个时间点后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。

4.5 顺序消息

顺序消息是消息队列RocketMQ版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。

分区顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。

Sharding Key
顺序消息中用来区分Topic中不同分区的关键字段,消息队列RocketMQ版会将设置了相同Sharding Key的消息路由到同一个分区下,同一个分区内的消息将按照消息发布顺序进行消费。和普通消息的Key是完全不同的概念。
分区
即Topic Partition,每个Topic包含一个或多个分区,Topic中的消息会分布在这些不同的分区中。本文中的逻辑分区指的就是Topic的分区。
物理分区:区别于逻辑分区,消息实际存储的单元,每个物理分区都会分配到某一台机器指定节点上。

4.6 延时消息

Producer将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

适用场景
消息生产和消费有时间窗口要求,比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

使用方式
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销(阿里云 RocketMQ 提供了任意时刻的定时消息功能)。
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的 level 来的,延迟队列默认是
msg.setDelayTimeLevel(5) 代表延迟一分钟
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的 level 即可。消费消息跟普通的消费消息一致。

4.7 死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,并且重试达到最大次数后,则表明 Consumer在正常情况下无法正确地消费该消息。此时,消息队列 MQ 不会立刻将消息丢弃,而是将这条消息发送到该GroupID对应的特殊队列中。
消息队列MQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-LetterQueue)。

4.8 消费幂等

为了防止消息重复消费导致业务处理异常,消息队列MQ的消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。

4.9 消息过滤

RocketMQ 分布式消息队列的消息过滤方式有别于其它 MQ 中间件,是可以实现服务端的过滤。

Tag过滤
Consumer端在订阅消息时除了指定Topic,还可以指定TAG(多个 TAG,可以用||分隔)。其中,Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给 Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传 Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag的hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对 tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

SQL92过滤
这种方式的大致做法和Tag过滤方式一样,只是具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter 模块负责的。
如果开启SQL过滤的话,Broker需要开启参数enablePropertyFilter=true,然后服务器重启生效。

4.10 事务消息

RocketMQ事务消息,是指发送消息事件和其他事件需要同时成功或同失败。

RocketMQ 采用两阶段提交的方式实现事务消息,处理流程为:
在这里插入图片描述

  1. 发送方向RocketMQ发送“待确认”(Prepare)消息。
  2. RocketMQ将收到的“待确认”(写入一个HalfTopic主题)消息持化成功后,向发送方回复已经发送成功,此时第一阶段消息发送完成。
  3. 发送方开始执行本地事件逻辑。
  4. 发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或Rollback)消息。RocketMQ收到Commit则将第一阶段消息标记为可投递(这些消息才会进入生产时发送实际的主题 RealTopic),订阅方将能够收到该消息;收到Rollback则删除第一阶段的消息。
  5. 如果出现异常情况,步骤4提交的二次确认未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起事务回查请求。
  6. 在收到事务回查请求后,发送方通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。

4.11 RocketMQ存储设计

4.11.1 消息存储结构

commitLog
commitLog以物理文件的方式存放,每台Broker上的commitLog 被本机器所有consumeQueue共享。在commitLog中,一个消息的存储长度是不固定的,RocketMQ 采取一些机制,尽量向CommitLog中顺序写,但是随机读。commitlog 文件默认大小为lG,可通过在broker配置文件中设置mapedFileSizeCommitLog属性来改变默认大小。

consumeQueue
consumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个messageQueue都有一个对应的consumeQueue文件。
其构建机制是,当消息到达commitlog文件后,由专门的线程产生消息转发任务,从而构建消息消费队列文件(consumeQueue )与索引文件。

indexFile
index文件夹下存的是索引文件(使用Hash索引机制),用来加快消息查询的速度。RocketMQ专门为消息订阅构建索引文件,来提高根据主题检索消息的速度。

config
config 文件夹中存储着Topic和Consumer等相关信息。topics.json:topic配置属性。
subscriptionGroup.json:消费者组配置信息。
delayOffset.json:延时消息队列拉取进度。
consumerOffset.json:集群消费模式消息消进度。

其他
abort:如果存在abort文件说明Broker非正常闭,该文件默认启动时创建,正常退出之前删除。
checkpoint:文件检测点,存储commitlog、consumeQueue、index 文件最后一次刷盘时间戳。

4.11.2 内存映射

内存映射文件,是由一个文件到一块内存的映射。文件的数据就是这块区域内存中对应的数据,读写文件中的数据,直接对这块区域的地址操作就可以,减少了内存复制的环节。所以说,内存映射文件比起文件 I/O 操作,效率要高,而且文件越大,体现出来的差距越大。

4.11.3 文件刷盘机制

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写息在通过Producer写入RocketMQ的时候,有两种写入磁盘方式:同步刷盘和异步刷盘。

同步刷盘
在消息追加到内存后,将同步调用 MappedByteBuffer.force()方法进行刷盘,刷盘操作成功后返回消息给发送端。
由于频繁的触发磁盘写动作,会明显降低性能。

异步刷盘
在消息追加到内存(PAGECACHE)后立刻返回消息给发送端。RocketMQ使用个单独的线程按照某个设定的频执行刷盘操作。

通过broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH (异步刷盘)、SYNC_FLUSH 同步刷盘), 默认为异步。

4.11.4 过期文件删除

由于RocketMQ操作commitLog、consumeQueue文件是基于内存映射机制,并在启动的时候会加载commitlog、consumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。
删除过程分别执行清理消息存储文件(commitlog)和消息消费队列文件(consumeQueue),两者共用一套文件期机制。

RocketMQ清除过期文件的方法:
如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42 小时(不同版本的默认值不同),通过在 Broker 配置文件中设置fileReservedTime(文件保留时间)来改变过期时间(单位为小时)。
触发文件清除操作的是一个定时任务,默认每10s执行一次。
过期判断
文件过期主要由fileReservedTime来控制。
另外还有其他两个配置参数:

  1. deletePhysicFilesInterval
    删除物理文件的时间间隔(默认100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间要被删除,因此删除一个文件后需要间隔deletePhysicFilesInterval再删除下一个文件。
    由于删除文件是一个非常耗费 IO 的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。
  2. destroyMapedFileIntervalForcibly
    表示文件在第一次删除拒绝后(文件被线程引用),文件保存的最大时间。在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少 1000,直到引用小于等于 0 为止,即可删除该文件。

删除条件

  1. 指定删除文件的时间点
    RocketMQ通过deleteWhen设置一天的固定时间执行一次删除操作,默认为凌晨4点。
  2. 磁盘空间不足
    如果磁盘空间占用超过DiskSpaceCleanForciblyRatio(默认85),会触发过期文件删除操作。

另外还有RocketMQ的磁盘配置参数:

  1. 物理使用率大于 diskSpaceWarningLevelRatio(默认90),则会阻止新消息的插入。
  2. 物理磁盘使用率小于diskMaxUsedSpaceRatio(默认75) 表示磁盘使用正常。

4.12 RocketMQ高可用机制

RocketMQ的Broker分为Master(主)和 Slave(从)两个角色。为了保证高可用性(HA),Master收到消息后,要把内容同步到Slave上,这样一旦Master宕机Slave仍然可以提供服务。

RocketMQ集群部署
在这里插入图片描述

集群部署的方式

  1. 多主
    多个主节点组成集群。
    优点:所有模式中性能最高。
    缺点:单个Master宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
    使用同步刷盘可以保证消息不丢失,同时Topic相对应的 queue应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该topic的应用造成影响。

  2. 多主多从,异步复制模式
    在多Master的基础上,每个Master都有至少一个对应的Slave。Master可读可写,Slave只能读,类似于mysql的主备模式。
    优点:一般情况下都是Master消费,在Master宕机或超过负载时,消费者可以从Slave读取消息,消息的实时性不受影响,性能几乎和多Master一样。
    缺点:使用异步复制的数据同步方式有可能会有消息丢失的问题。

  3. 多主多从,同步双写模式
    优点:同步双写的同步模式能保证数据不丢失。
    缺点:发送单个消息响应时间会略长,性能相比异步复制低 10%左右。

对数据要求较高的场景,建议采用同步复制方式、异步刷盘方式,来保存数据热备份和高吞吐量。

多主模式与数据重复
多主模式,生产者发送一条消息,只会写入到一台broker的一个queue中,所以有几台master跟消息会不会重复没有直接关系。

主从复制原理
从服务器在启动的时候主动向主服务器建立TCP长连接,然后获取服务器的commitlog最大偏移,以此偏移向主服务器主动拉取消息,主服务器根据偏移量,与自身commitlog文件的最大偏移进行比较,如果大于从服务器 commitlog偏移,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。

读写分离机制
消息消费者在向Broker发送消息拉取请求时,会根据筛选出来的消息队列,判定是从Master,还是从Slave拉取消息,默认是Master。
Broker 接收到消息消费者拉取请求,在获取本地堆积的消息量后,会计算服务器的消息堆积量是否大于物理内存的一定值,如果是,则标记下次从Slave服务器拉取,计算Slave服务器的Broker Id,并响应给消费者。
消费者在接收到Broker的响应后,会把消息队列与建议下一次拉取节点的 Broker Id 关联起来,并缓存在内存中,以便下次拉取消息时,确定从哪个节点发送请求。

文章来源:https://blog.csdn.net/aaxzz/article/details/135476617
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。