本文梳理笔者 MQ 知识,从消息中间件的基础知识讲起,在有了基础知识后,对市面上各主流的消息中间件进行详细的解析,包括 RabbitMQ、RocketMQ、Kafka、Pulsar,最后再横向对比这几款主流的消息中间件。本篇是系列文章第二篇。
Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持你的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。默认情况,如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。
在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要? RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。也可以这么说,Queue 是一个长度无限的数组,Offset 就是下标。
RabbitMQ 类似有生产阶段、存储阶段、消费阶段,相较 RabbitMQ 的架构,增加了 NameServer 集群,横向拓展能力较好。参考的 Kafka 做的设计,故也同样拥有 NIO、PageCache、顺序读写、零拷贝的技能,单机的吞吐量在十万级,横向拓展能力较强,官方声明集群下能承载万亿级吞吐。
存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。
1、消息只要持久化到 CommitLog(日志文件)中,即使 Broker 宕机,未消费的消息也能重新恢复再消费。
2、Broker 的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在 Pagecache 中(内存中),但是同步刷盘更可靠,它是 Producer 发送消息后等数据持久化到磁盘之后再返回响应给 Producer。
Broker 通过主从模式来保证高可用,Broker 支持 Master 和 Slave 同步复制、Master 和 Slave 异步复制模式,生产者的消息都是发送给 Master,但是消费既可以从 Master 消费,也可以从 Slave 消费。同步复制模式可以保证即使 Master 宕机,消息肯定在 Slave 中有备份,保证了消息不会丢失。
Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
如何达到发送端写的高可用性呢?在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 BrokerId 机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组Master 仍然可用, Producer 仍然可以发送消息。
此架构下的 RocketMQ 不支持把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。由此,在高可用场景下此问题变得棘手,故需要引入分布式算法的实现,追求 CAP,但实践情况是不能同事满足 CA的,在互联网场景下较多是在时间 BASE 理论,优先满足 AP,尽可能去满足 C。RocketMQ 引入的是实现 Raft 算法的 Dledger,拥有了选举能力,主从切换,架构拓扑图是这样的:
分布式算法中比较常常听到的是 Paxos 算法,但是由于 Paxos 算法难于理解,且实现比较困难,所以不太受业界欢迎。然后出现新的分布式算法 Raft,其比 Paxos 更容易懂与实现,到如今在实际中运用的也已经很成熟,不同的语言都有对其的实现。Dledger 就是其中一个 Java 语言的实现,其将算法方面的内容全部抽象掉,这样开发人员只需要关系业务即可,大大降低使用难度。
生产者将消息发送至 Apache RocketMQ 服务端。
Apache RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
生产者开始执行本地事务逻辑。
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者。
二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。说明 服务端回查的间隔时间和最大回查次数,请参见[参数限制](https://rocketmq.apache.org/zh/docs/introduction/03limits/)。
生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
事务消息生命周期
初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ 会对消息进行重试处理。具体信息,请参见消费重试。
消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和[清理机制](https://rocketmq.apache.org/zh/docs/featureBehavior/11messagestorepolicy/)。
在过去“分”往往是技术实现的妥协,而现在“合”才是用户的真正需求。RocketMQ 5.0 基于统一 Commitlog 扩展多元化索引,包括时间索引、百万队列索引、事务索引、KV索引、批量索引、逻辑队列等技术。在场景上同时支撑了 RabbitMQ、Kafka、MQTT、边缘轻量计算等产品能力,努力实现“消息、事件、流”的扩展支持,云原生是主流。
更多信息可查看官网 [Apache RocketMQ](https://rocketmq.apache.org/zh/)。
Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成。它可以部署在本地和云环境中的裸机硬件、虚拟机和容器上。
服务器:Kafka 作为一个或多个服务器集群运行,可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect 以事件流的形式持续导入和导出数据,以将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一台服务器发生故障,其他服务器将接管它们的工作以确保连续运行而不会丢失任何数据。
客户端:它们允许您编写分布式应用程序和微服务,即使在出现网络问题或机器故障的情况下,也能以容错的方式并行、大规模地读取、写入和处理事件流。Kafka 附带了一些这样的客户端,这些客户端由 Kafka 社区提供的 数十个客户端进行了扩充:客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams 库,用于 Go、Python、C/C++ 和许多其他编程语言以及 REST API。
与前面两个 MQ 类似有生产阶段、存储阶段、消费阶段,相比 RocketMQ 这里的注册中心是用的 Zookeeper,Kafka 的诸多事件都依赖于 ZK,元数据管理、各个角色的注册、心跳、选举、状态维护,这里的角色包括 Boker、 Topic、 Partition、 消费者组等。
所以这里也会带来 ZK Watch 事件压力过大的问题,大量的 ZK 节点事件阻塞在队列中, 导致自旋锁, 导致 CPU 上升, 由于大量数量事件对象导致占用了大量的内存。
图中的 Controller 是 Kakfa 服务端 Broker 的概念,Broker 集群有多台,但只有一台 Broker 可以扮演控制器的角色;某台 Broker 一旦成为 Controller,它用于以下权力:完成对集群成员管理、主题维护和分区的管理,如集群 Broker 信息、Topic 维护、Partition 维护、分区选举 ISR、同步元信息给其他 Broker 等。
Topic 是逻辑上的概念,而 Partition 是物理上的概念,即一个 Topic 划分为多个 Partition,每个 Partition 对应一个Log文件。
.log文件:存储消息数据的文件。
.index文件:索引文件,记录一条消息在log文件中的位置。
.snapshot文件:记载着生产者最新的offset。
.timeindex时间索引文件:当前日志分段文件中建立索引的消息的时间戳,是在 0.10.0 版本后增加的,用于根据时间戳快速查找特定消息的位移值,优化 Kafka 读取历史消息缓慢的问题。为了保证时间戳的单调递增,可以将log.message.timestamp.type 设置成 logApendTime,而 CreateTime 不能保证是消息写入时间。
上图是三个 Broker、两个 Topic、两个 Partition 的 Broker ?的存储情况,可以延伸想象一下百万级 Topic 的存储情况会很复杂。
为了解决强依赖 Zookeeper 进行 Rebalance 带来的问题,Kafka 引入了 Coordinator 机制。
首先,触发 Rebalance (再均衡)操作的场景目前分为以下几种:消费者组内消费者数量发生变化,包括:
有新消费者加入
有消费者宕机下线,包括真正宕机,或者长时间 GC、网络延迟导致消费者未在超时时间内向 GroupCoordinator 发送心跳,也会被认为下线。
有消费者主动退出消费者组(发送 LeaveGroupRequest 请求) 比如客户端调用了 unsubscrible() 方法取消对某些主题的订阅
消费者组对应的 GroupCoordinator 节点发生了变化。
消费者组订阅的主题发生变化(增减)或者主题分区数量发生了变化。
节点扩容
更多信息可查看 Kafka 官网 [Apache Kafka](https://kafka.apache.org/)
在最高层,一个 Pulsar 实例由一个或多个 Pulsar 集群组成。一个实例中的集群可以在它们之间复制数据。
在 Pulsar 集群中:
一个或多个 Broker 处理和负载平衡来自生产者的传入消息,将消息分派给消费者,与 Pulsar 配置存储通信以处理各种协调任务,将消息存储在 BookKeeper 实例(又名 bookies)中,依赖于特定集群的 ZooKeeper 集群用于某些任务等等。
由一个或多个 Bookie 组成的 BookKeeper 集群处理消息的持久存储。
特定于该集群的 ZooKeeper 集群处理 Pulsar 集群之间的协调任务。
下图展示了一个 Pulsar 集群:
Pulsar 用 Apache BookKeeper 作为持久化存储,Broker 持有 BookKeeper client,把未确认的消息发送到 BookKeeper 进行保存。
BookKeeper 是一个分布式的 WAL(Write Ahead Log)系统,Pulsar 使用 BookKeeper 有下面几个便利:
可以为 Topic 创建多个 Ledgers:Ledger 是一个只追加的数据结构,并且只有一个 Writer,这个 Writer 负责多个 BookKeeper 存储节点(就是 Bookies)的写入。Ledger 的条目会被复制到多个 Bookies;
Broker 可以创建、关闭和删除 Ledger,也可以追加内容到 Ledger;
Ledger 被关闭后,只能以只读状态打开,除非要明确地写数据或者是因为 Writer 挂掉导致的关闭;
Ledger 只能有 Writer 这一个进程写入,这样写入不会有冲突,所以写入效率很高。如果 Writer 挂了,Ledger 会启动恢复进程来确定 Ledger 最终状态和最后提交的日志,保证之后所有 Ledger 进程读取到相同的内容;
除了保存消息数据外,还会保存 Cursors,也就是消费端订阅消费的位置。这样所有 Cursors 消费完一个 Ledger 的消息后这个 Ledger 就可以被删除,这样可以实现 Ledgers 的定期翻滚从头写。
从架构图可以看出,Broker 节点不保存数据,所有 Broker 节点都是对等的。如果一个 Broker 宕机了,不会丢失任何数据,只需要把它服务的 Topic 迁移到一个新的 Broker 上就行。
Broker 的 Topic 拥有多个逻辑分区,同时每个分区又有多个 Segment。
Writer 写数据时,首先会选择 Bookies,比如图中的 Segment1。选择了 Bookie1、Bookie2、Bookie4,然后并发地写下去。这样这 3 个节点并没有主从关系,协调完全依赖于 Writer,因此它们也是对等的。
在遇到双十一等大流量的场景时,必须增加 Consumer。
这时因为 Broker 不存储任何数据,可以方便的增加 Broker。Broker 集群会有一个或多个 Broker 做消息负载均衡。当新的 Broker 加入后,流量会自动从压力大的 Broker 上迁移过来。
对于 BookKeeper,如果对存储要求变高,比如之前存储 2 个副本现在需要存储 4 个副本,这时可以单独扩展 Bookies 而不用考虑 Broker。因为节点对等,之前节点的 Segment 又堆放整齐,加入新节点并不用搬移数据。Writer 会感知新的节点并优先选择使用。
对于 Broker,因为不保存任何数据,如果节点宕机了就相当于客户端断开,重新连接其他的 Broker 就可以了。
对于 BookKeeper,保存了多份副本并且这些副本都是对等的。因为没有主从关系,所以当一个节点宕机后,不用立即恢复。后台有一个线程会检查宕机节点的数据备份进行恢复。
在遇到双十一等大流量的场景时,必须增加 Consumer。
这时因为 Broker 不存储任何数据,可以方便的增加 Broker。Broker 集群会有一个或多个 Broker 做消息负载均衡。当新的 Broker 加入后,流量会自动从压力大的 Broker 上迁移过来。
对于 BookKeeper,如果对存储要求变高,比如之前存储 2 个副本现在需要存储 4 个副本,这时可以单独扩展 Bookies 而不用考虑 Broker。因为节点对等,之前节点的 Segment 又堆放整齐,加入新节点并不用搬移数据。Writer 会感知新的节点并优先选择使用。
Pulsar 可以使用多租户来管理大集群。Pulsar 的租户可以跨集群分布,每个租户都可以有单独的认证和授权机制。租户也是存储配额、消息 TTL 和隔离策略的管理单元。
在和其他组件或者生态对接方面,Pulsar 可以支持很多种消息协议,对于存量系统的MQ首次接入、切换MQ都很方便。
更多信息可查看 Pulsar 官网?[Apache Pulsar](https://pulsar.apache.org/)
此图摘抄自《面渣逆袭:RocketMQ二十三问》
这个图没有 Pulsar 的信息,从网上看到的压测报告来看,Pulsar 吞吐量大概是 Kafka 的两倍左右,延迟表现比 Kafka 低不少,Pulsar 的 I/O 隔离显著优于 Kafka。比较详实的 Pulsar 和 Kafka 的比对可以查阅 StreamNative 的文章《Pulsar和Kafka基准测试:Pulsar 性能精准解析(完整版)》,StreamNative 作为 Apache Pulsar 的商业化公司,数据和结果还是比较可靠的。
常言道,最好的学习方法是带着问题去寻找答案,在路上捡拾更多果实,增加经验值,快速升级。很多人推荐费曼学习法,以教代学,按可以教别人的标准来学习,最终产出教学内容为目的来学习一个知识,能让自己高效学习。在我看来这很像绩效考核用的 OKR 工具,为项目设定关键成果,实现成功应该做什么?怎么做?而我写这篇文章是在实践费曼学习法。
所以,在这里我给出几个问题,读者可以根据自己的兴趣爱好带着问题去寻找答案吧。
如何保证消息的可用性/可靠性/不丢失呢?
如何处理消息重复的问题呢?
顺序消息如何实现?
怎么处理消息积压?
怎么实现分布式消息事务的?半消息?
如何实现消息过滤?
如果自己平时想到的问题太多,不知道先看哪一个,那么自己想清楚为什么要学这些知识点,哪个问题对于当前的自己收益最大。
参考资料:
官网文档地址:??
RabbitMQ 官网文档:https://www.rabbitmq.com/documentation.html
Apache RocketMQ 官网文档:https://rocketmq.apache.org/zh/docs/
Apache Kafka 官网文档:https://Kafka.apache.org/documentation/
Apache Pulsar 官网文档:https://pulsar.apache.org/docs/