消息队列(Messeage Queue,MQ)是在分布式系统架构中常用的一种中间件技术,从字面表述看,是一个存储消息的队列,所以它一般用于给 MQ 中间的两个组件提供通信服务。
我们引入一个削峰填谷实际场景来介绍 MQ ,削峰填谷是指处理短时间内爆发的请求任务,将巨量请求任务“削峰”,平摊在平常请求任务较低的时间段,也就是“填谷”。 比如组件1 发布请求任务,组件2接受请求任务并处理。如果没有 MQ , 组件2 就会在大量的请求任务下会出现假死的情况:
而如果使用 MQ 后可以将这些请求先暂存到队列中,排队执行,就不会出现组件2 假死的情况了。我们一般把发送消息的组件称为生产者,接受消息的组件称为消费者,如下图展示一个消息队列的模型:
消息队列需要满足消息有序性、能处理重复的消息以及消息可靠性,这样才能保证存取消息的一致性。
主要的应用有:异步处理、流量削峰、系统解耦
秒杀活动中,会短时间出现爆发式的用户请求,如果没有消息队列,会导致服务器响应不过来。轻则会导致服务假死;重则会让服务器直接宕机。
这时可以加上消息队列,服务器接收到用户的请求后,先把这些请求全部写入消息队列中再排队处理,这样就不会导致同时处理多个请求的情况;若消息队列长度超过承载的最大数量,可以抛弃后续的消息,给用户返回“页面出错,请重新刷新”提示,这样降低服务器的负载,而且也能给用户很好的交互体验。
此外,我们可以利用消息队列来把系统的业务功能模块化,实现系统功能的解耦。如下图:
如果有两个功能服务,而且关系不是很紧密,比如订单系统和优惠券,虽然都和用户有关联,但是如果都放在用户模块,面临功能删减时会很麻烦。所以采用把两个服务独立出来,而将两个服务的消息发送以约定的方式通过消息队列发送过去,让其对应的消费者分别处理即可达到系统解耦的目的。
RabbitMQ 是一个老牌的开源消息中间件,它实现了标准的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)消息中间件,使用 Erlang 语言开发,支持集群部署。支持 java、python、Go、.NET 等等主流开发语言。
其主要的运行流程如下图:
我们发现在 Rabbit 服务器中,它在生产者和队列间加入了交换器(ExChange)模块,它的作用和交换机很相似,它会根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由规则很灵活,可以自己来进行设计。
因为中间中的交换器模块,所以RabbitMQ 有不同的消息类型,主要分为以下几种:
但是 Rabbit 也存在以下的问题:
Kafka 是 LinkedIn 公司开发的基于 ZooKeeper 的多分区、多副本的分布式消息系统,它于 2010 年贡献给了 Apache 基金会,并且成为了 Apache 的顶级开源项目。其中 ZooKeeper 的作用是用来为 Kafka 提供集群元数据管理以及节点的选举和发现等功能。
与 RabbitMQ 不同中间的 Kafka 集群部分是由 Broker 代理和 ZooKeeper 集群组成:
同时 Kafka 也有缺点:
1.3.3.1 RocketMQ 介绍
RocketMQ 是阿里巴巴开源的分布式消息中间件,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,后来捐赠给 Apache 软件基金会。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等
RocketMQ 要求生产者和消费者必须是一个集群。集群级别的高可用,是RocketMQ 和其他 MQ 的区别。
List 的先进先出其实就符合消息队列对消息有序性的需求。具体实现如下图:
但是,在生产者往 List 中写入数据时,List 消息集合并不会主动地通知消费者有新消息写入。所以 Redis 提供了 brpop
命令, brpop
命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。此外,消息队列通过给每一个消息提供全局唯一的 ID 号来解决分辨重复消息的需求。而消息的最后一个需求,消息可靠性如何解决呢?为了留存消息,List 类型提供brpoplpush
命令来让消费者从一个 List 中读取消息,同时, Redis 会把这个消息再插入到另一个 List 中留存。这样如果消费者处理时发生宕机,再次重启时,也可以从备份 List 中重新读取消息并进行处理。如下图:
Redis 主要有两种发布/订阅模式:基于频道(channel)和基于模式(pattern)的发布/订阅。
在 Redis 2.0 之后 Redis 就新增了专门的发布和订阅的类型,Publisher(发布者)和 Subscriber(订阅者)来实现消息队列了,它们对应的执行命令如下:
# 发布消息
publish channel "message"
# 订阅消息
subscribe channel
# 取消订阅
unsubscribe channel
除了订阅频道外,客户端还可以通过 psubscribe
命令订阅一个或者多个模式,从而成为这些模式的订阅者,它还会被发送给所有与这个频道相匹配的模式的订阅者,命令如下:
# 订阅模式
psubscribe pattern
# 退订模式
punsubscribe pattern
那么我们如何用发布/订阅来实现消息队列?我们可以使用模式订阅的功能,利用一个消费者"queue_"来订阅所有以"queue__"开头的消息队列。如下图:
但是发布订阅模式也存在以下缺点:
然而在 Redis 5.0 之后新增了 Stream 类型,它提供了丰富的消息队列操作命令:
XADD:插入消息,保证 MQ 有序,可以自动生成全局唯一 ID
# mqstream 为消息队列,消息的键是 repo 值为5
# * 表示自动生成一个全局唯一ID
XADD mqstream * repo 5
XREAD:用于读取消息,可以按 ID 读取数据,保证MQ对重复消息的处理;
# 从 1599203861727-0 起读取后续的所有消息
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
XREAD
后的block 配置项,类似于 brpop
命令的阻塞读取操作,后面的 100 的单位是毫秒,表示如果没有消息到来,XREAD
将阻塞 100 毫秒。
XREADGROUP:按消费组形式读取消息;
# 创建名为 group1 的消费组,其消费队列是 mqstream
XGROUP create mqstream group1 0
# 让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息
# 命令最后的参数 ">" 表示从第一条尚未被消费的消息开始读取
XREADGROUP group group1 consumer1 streams mqstream >
使用消费组的目的是让组内的多个消费者共同分担读取消息,通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息(保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息),而 XACK 命令用于向消息队列确认消息处理已完成。
List 和 Streams 实现消息队列的特点和区别:
关于 Redis 是否适合做消息队列,引用一下蒋德钧老师的看法:
Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。所以,关于是否用 Redis 做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。
https://zhuanlan.zhihu.com/p/86812691
https://kaiwu.lagou.com/course/courseInfo.htm?courseId=59#/detail/pc?id=1775
https://time.geekbang.org/column/article/284291
https://www.cnblogs.com/weifeng1463/p/12889300.html
https://pdai.tech/md/db/nosql-redis/db-redis-x-pub-sub.html
《Redis 设计与实现》
《Redis 开发与运维》