消息队列(MQ):全称为Message Queue。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
从目前互联网应用中使用消息队列的场景来看,主要有以下三个:
异步处理数据
系统应用解耦
业务流量削峰
下面对上述每一种场景进行简单描述。
第一个例子我们以现实生活中送快递来类比,在该例子中我们把暂存快递的快递柜比作暂存数据的消息队列。我们来看一下在现实生活中,没有快递柜时,快递员把快递送到目的地后,一般需要把联系收货人来签收快递,如果收货人此时有空,那一切都很顺利。但如果收货人此时不方便(开会、正在吃午饭、外出出差)。那对于快递员而言,就很尴尬,需要一直等待(开会 or 吃午饭)或者将快递拿回去(外出出差),导致白跑一趟。这对于快递员而言简直太不友好。
从这儿可以看出,当快递员送货时,是一个同步状态,即需要等待收货人签收后才能去送下一趟单子,对快递员而言效率太低。上述例子虽然有点牵强,大家凑合理解,意思能大概理解到位就 ok。
接着我们再来看一下,当有了快递柜后,对于快递员而言,每次需要送快递时,只需要将快递投掷到快递柜,然后再通过短信或者电话通知收货人具体的快递信息即可。他就可以继续去派送下一单。而对于收获人而言,也可以根据具体方便的时间来取件。这样一来,二者完全异步了,不用相互等待了。
在这个例子中,如果把快递员比作生产者,收货人比作是消费者,则快递柜就类似于消息队列。我们可以通过采用消息队列来实现异步数据的处理。
案例二我们以目前最主流的推荐系统中内容的流转来举例。在推荐系统中当创作者发布了一条内容后,该内容会首先经过安全部分的相关审核,通过审核后的内容,通常需要进行内容入库存储、送入模型进行特征的计算和生成。
假如后期我们想提升推荐的效果,需要单独构建一份冷启动的推荐池,此时也需要用到这部分内容,那问题来了,在没有使用消息队列时,对于上游服务而言,需要通过扩展新的逻辑来实现该功能。同时在该场景里,会存在依赖三个下游服务,如果其中一个下游服务失败后,该如何处理,是重试还是返回失败等这些细节的处理。如果后期这部分数据还想在其他渠道分发,那又该如何对接。明显这种场景下面临着系统紧耦合的问题。
我们再来看一下,如果我们一开始就引入了消息队列,那问题又会变成怎样的呢?当内容审核通过后,就直接将数据生产出来丢到消息队列中,下游的多个服务再从消息队列消费数据。当后续这一份数据需要扩展供其他系统使用时,也只要通过新的消费者来接入到消息队列消费就 ok。上游生产消息的模块不要做任何的改动。这样我们就通过消息队列进行了系统应用之间的解耦。这是消息队列的第二个用途。
消息对应的第三个使用场景便是削峰。在现如今的互联网世界中,电商场景中每年的 618 秒杀活动、双 11 抢购便是最典型的案例。这种场景中系统的峰值流量往往集中于一小段时间内,平常的流量比较可控,所以为了防止系统在短时间内的峰值流量冲垮,往往采用消息队列来削弱峰值流量,高峰值期间产生的订单消息等数据首先送入到消息队列中暂存,然后供下游系统根据自己的消费能力来逐步处理。同时这类消息往往对时延的要求不是很高,比较适合采用消息队列暂存。
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
目前市场上的主流MQ主要有以下几个:pulsar、kafka、rabbitmq、rocketmq、activemq,对于不同Mq的选型,我们主要做一些分析。
ActiveMQ:?(基于JMS)ActiveMQ 由 Apache 软件基金会基于 Java 语言开发的一个开源的消息代理。能够支持多个客户机或服务器。计算机集群等属性支持 ActiveMQ 来管理通信系统。
RabbitMQ:?(AMQP)RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。RabbitMQ 支持多种消息传递协议、传递确认等特性。
Kafka: Apache Kafka 是由 Apache 软件基金会开发的一个开源消息系统项目,由 Scala 写成。Kafka 最初是由 LinkedIn 开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。Kafka 是一个分布式的、分区的、多复本的日志提交服务。它通过一种独一无二的设计提供了一个消息系统的功能。
RocketMQ:?(基于JMS)Apache RocketMQ 是一个分布式消息和流媒体平台,具有低延迟、强一致、高性能和可靠性、万亿级容量和灵活的可扩展性。它有借鉴 Kafka 的设计思想,但不是 kafka 的拷贝。
Pulsar: Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。
上图是几乎所有消息队列设计的一个核心模型。对于一个消息队列而言,从数据流向的维度,可以拆解为三大部分:生产者、消息队列集群、消费者,数据是从生产者流向消息队列集群,最终再从消息队列集群流向消费者,下面对这几个概念进行一一阐述。
生产数据的服务,通常也称为数据的输入提供方,这里的数据通常指我们的业务数据,例如推荐场景中用户对内容的点击数据、内容曝光数据、电商中的订单数据等等。
生产者通常是作为客户端的方式存在,但在支持事务消息的消息队列中,生产者也被设计为服务端,实现事务消息这一特性。其次生产者通常会有多个,消息队列集群内部也会有多个分区队列,所以在生产者发送数据时,通常会存在负载均衡的一些策略,常见的有按 key hash、轮询、随机等方式。其本质是一条数据,被消息队列封装后也被称为一条消息,该条消息只能发送到其消息队列集群内部的一个分区队列中。因此只需按照一定的策略从多个队列中选择一个队列即可。
消息队列集群是消息队列这种组件实现中的核心中的核心,它的主要功能是存储消息、过滤消息、分发消息。
其中存储消息主要指生产者生产的数据需要存储到消息队列内部;存储消息可以说是消息队列的核心,一个消息队列吞吐量的高低、性能优劣都和它的存储模型脱不开关系。
过滤消息只指消息队列可以通过一定的规则或者策略进行消息的过滤,该项能力通常也被称为消息路由;过滤消息属于高阶的特性功能,AMQP 协议对这些能力抽象的比较完备,部分消息队列可以选择性的实现该协议来达到该功能,关于 AMQP 协议内容读者可以自行搜索资料阅读,此处不再展开。
分发消息是指消息队列通常需要将消息分发给处理同一逻辑的多个消费者处理或者处理不同逻辑的不同消费者处理。分发消息可以说和消费者模型想挂钩,这块会涉及到不同的数据获取方式,也会涉及到消费者消费消息的模型。
此外绝大部分的消息队列也都支持对消息进行分类,分类的标签称为topic(主题),一个 topic 中存放的是同一类消息。
最终消息队列存储的消息会被消费者消费使用,消费者也可以看做消息队列中数据的输出方。消费者通常有两种方式从消息队列中获取数据:推送(push)数据、拉取(pull)数据,3.3 节中会对这两种方案进行详细对比说明。其次消费者也经常是作为客户端的角色出现在在消息队列这种组件中。
在这一节中,我们详细看看消息队列存储消息这个环节的一些权衡考量,通常数据的存储无外乎就是两种,一种是存储在非易失性存储中,例如磁盘这种介质;另一种是选择存储在易失性存储中,典型的就是内存。
通常在大部分组件设计时,往往会选择一种主要介质来存储、另一种介质作为辅助使用。就拿 redis 来说,它主要采用内存存储数据,磁盘用来做辅助的持久化。拿 RabbitMQ 举例,它也是主要采用内存存储消息,但也支持将消息持久化到磁盘。而 RocketMQ、Kafka、Pulsar 这种,则是数据主要存储在磁盘,通过内存来主力提升系统的性能。关系型数据库例如 mysql 这种组件也是主要采用磁盘组织数据,合理利用内存提升性能。
针对采用内存存储数据的方案而言,难点一方面在于如何在不降低访问效率的情况下,充分利用有限的内存空间来存储尽可能多的数据,这个过程中少不了对数据结构的选型、优化;另一方面在于如何保证数据尽可能少的丢失,我们可以看到针对此问题的解决方案通常是快照+广泛意义的 wal 文件来解决。此类典型的代表就是 redis 啦。
针对采用磁盘存储数据的方案而言,难点一方面在于如何根据系统所要解决的特点场景进行合理的对磁盘布局。读多写少情况下采用 b+树方式存储数据;写多读少情况下采用 lsm tree 这类方案处理。另一方面在于如何尽可能减少对磁盘的频繁访问,一些做法是采用 mmap 进行内存映射,提升读性能;还有一些则是采用缓存机制缓存频繁访问的数据。还有一些则是采用巧妙的数据结构布局,充分利用磁盘预读特性保证系统性能。
总的来说,针对写磁盘的优化,要不采用顺序写提升性能、要不采用异步写磁盘提升性能(异步写磁盘时需要结合 wal 保证数据的持久化,事实上 wal 也主要采用顺序写的特性);针对读磁盘的优化,一方面是缓存、另一方面是 mmap 内存映射加速读。
上述这些存储方案上权衡的选择在 kafka、RocketMQ、Pulsar 中都可以看到。其实抛开消息队列而言,这些存储方案的选择上无论是关系型数据库还是 kv 型组件都是通用的。
下图列举了几种磁盘上的数据组织方式,仅供大家参考。
消费者在从消息队列中获取数据时,主要有两种方案: 1. 等待推送数据 2. 主动拉取数据
在此处,个人想抛开消息队列谈一点关于这两种方案的理解,其实推拉模型不仅仅只用于消息队列这种组件中,更一般意义上,它解决的其实是数据传送双方的一个问题。本质是数据需要从一方流向另一方。顺着这个思路来看,下面这三个例子都是遵循这个原则。
网络中传输的数据: 在 IO 多路复用中,以 epoll 为例,当内核检测到监听的描述符有数据到来时,epoll_wait()阻塞会返回,上层用户态程序就知道有数据就绪了,然后可以通过 read()系统调用函数读取数据。这个过程就绪通知,类似于推送,但推送的不是数据,而是数据就绪的信号。具体的数据获取方式还是需要通过拉取的方式来主动读。
feeds 流系统用户时间线后台实现方案(读扩散、写扩散): 读扩散和写扩散更是这样一个 case。对于读扩散而言,主要采用拉取的方式获取数据。而对于写扩散而言,它是典型的数据推送的方式。当然在系统实现中,更复杂的场景往往会选择读写结合的思路来实现。
生活中的点外卖例子: 当下单点外卖时,通常也会有两种方式可以选择,外卖派送、到店自取。不过通常外卖派送比较实时,我们通常就选择这种方式了而已。可以看出外卖派送其实就是一种推的方式,而到店自取,则是拉的方式。
本节我们来介绍最后一部分内容,消息队列中消费者的消费模型。下图中上半部分展示了最简单的一种消费模型。一个生产者、一个消费者。但往往我们的一份数据通常会被不同场景所使用。那这个时候,首先就会存在每种场景需要使用全量的数据、而且不同场景之间不会相关影响,彼此独立。方便理解起见,我们假设有 N 个场景需要使用这同一份数据,每个场景需要消费全量的数据。而在 N 个场景中的一种场景里,又会有多个消费者一起分摊消费这些数据。我们假设一个场景里有 M 个消费者。由于每个场景中包含 M 个消费者,我们也将其采用消费者组来描述。通过上面的介绍,我们可以用下面一句话总结消息队列中的消费模型:
消费者消费者模型其实是一个 1:N:M 的关系,一份数据被 N 个消费者组独立使用,每个消费者组中有 M 个消费者进行分摊消费
其实这种模型也称为发布订阅模型,对于一条消息而言,组间广播、组内单播。一条消息只能被一个消费者组中的一个消费者使用。在消费者组内部也存在一些负载均衡的策略。常用的有:轮询、随机、hash、一致性 hash等方案。