消息中间件属于分布式系统中的子系统,关注数据的发送和接收,利用高效、可靠的异步消息传递机制,对分布式系统中的各个子系统进行集成。
异步
解耦
缓冲能力
伸缩性
提高系统扩展性
异步处理
注册后发送短信等。
应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
1)假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;
2)订单系统与库存系统耦合;
引入消息队列,在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
日志处理
将消息队列用在日志处理中,比如 Kafka 的应用,解决大量日志传输的问题。
消息通讯
消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。例如实现点对点消息通讯或者聊天室等。
流量削峰
秒杀、团购等活动。
用户访问量在ActiveMQ 的可承受范围内,而且确实主要是基于解耦和异步来用的,可以考虑ActiveMQ,也比较贴近 Java 工程师的使用习惯,但是ActiveMQ 现在停止维护了,同时ActiveMQ并发不高,所以业务量一定的情况下可以考虑使用。
RabbitMQ 作为一个纯正血统的消息中间件,有着高级消息协议 AMQP的完美结合,在消息中间件中地位无可取代,但是 erlang 语言阻止了我们去深入研究和掌控,对公司而言,底层技术无法控制,但是确实是开源的,有比较稳定的支持,活跃度也高。
对自己公司技术实力有绝对自信的,可以用 RocketMQ,但是 RocketMQ 诞生比较晚,并且更新迭代很快,这个意味着在使用过程中有可能会遇到很多坑,所以如果公司 Java 技术不是很强,不推荐使用。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka是业内的标准。
是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品、不同的开发语言等条件的限制。目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务,主要实现有RabbitMQ。
阿里巴巴开发的高可用的分布式集群技术、正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双11使用的核心产品。2017年提交到Apache基金会成为Apache基金会的顶级开源项目。
RocketMQ的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费。
RocketMQ的特点:
核心概念
整体运转流程
设计理念
设计目标
单向发送
发送方只负责发送消息,不等待服务器响应、没有回调函数。发送耗时短(微妙级)。
可靠同步发送
可靠异步发送
消息发送的主要流程
生产者在消息发送失败后,会重新尝试发送消息,默认重试2次。
可以通过参数进行配置:
失败重试规避:
RocketMQ 发现消息发送失败后,就会将失败的Broker排除在选择范围之外,下次发送消息时就不会发送到该Broker,这样做的目的就是为了提高发送消息的成功率。
集群消费
使用相同Group ID的订阅者属于同一个集群。同一个集群下的订阅者订阅关系一致(topic、tag都一致)。
广播消费
当使用广播消费模式时,RocketMQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
广播消费模式下不支持顺序消息。
广播消费模式下不支持重置消费位点。
广播消费模式下客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过。
推送
系统收到消息后自动调用处理函数来处理消息,自动保存Offset,并且加入新的消费者后会自动做负载均衡。
底层实现上,推送模式使用pull来实现。
拉取
DefaultMQPullConsumer
通过“长轮询”的方式拉取消息。
与推送方式比较,需要额外处理:
RocketMQ 使用“长轮询”的方式获取消息。核心思想是,客户端还是拉取消息,Broker端HOLD住客户端发过来的请求一小段时间,在这个时间内(5s)有新消息达到,就利用现有的连接立刻返回消息给 Consunmer。
“长轮询”的主动权掌握在 Consumer 手中,Broker即使有大量消息积压,也不会主动推送给 Consumer。因为长轮询方式的有局限性,在HOLD住Comsumer请求的时候需要占用资源,所以它适合在消息队列这种客户端连接数可控的场景中。
https://segmentfault.com/a/1190000018411470
如果有 8 个消息队列(q1-q8),有 3 个消费者(c1,c2,c3)
RocketMQ 默认提供 5 中分配算法:
平均分配(AllocateMessageQueueAveragely)
c1:q1,q2,q3
c2:q4,q5,q6
c3:q7,q8
平均轮询分配(AllocateMessageQueueAveragelyByCircle)
c1:q1,q4,q7
c2:q2,q5,q8
c3:q3,q6
一致性Hash(AllocateMessageQueueConsistentHash)
不推荐使用,因为消息队列负载均衡信息不容易跟踪
根据配置(AllocateMessageQueueByConfig)
为每一个消费者配置固定的消费队列
根据Broker部署机房名(AllocateMessageQueueByMachineRoom)
对每一个消费者负载不同 Broker上的队列
一般尽量使用“平均分配”“平均轮询分配”,因为分配算法比较直观。无论哪种算法,遵循的原则是一个消费者可以分配多个消息队列,同一个消息队列只会分配一个消费者,所以如果消费者个数量大于消息队列数量,则有些消费者无法消费消息。
为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 消费成功。
ConsumeConcurrentlyStatus.RECONSUME_LATER 消费失败,会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup。
为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回Broker(topic 不是原 topic 而是这个消费者的 RETRY topic),在延迟的某个时间点后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。
顺序消息是消息队列RocketMQ版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
分区顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
Sharding Key
顺序消息中用来区分Topic中不同分区的关键字段,消息队列RocketMQ版会将设置了相同Sharding Key的消息路由到同一个分区下,同一个分区内的消息将按照消息发布顺序进行消费。和普通消息的Key是完全不同的概念。
分区
即Topic Partition,每个Topic包含一个或多个分区,Topic中的消息会分布在这些不同的分区中。本文中的逻辑分区指的就是Topic的分区。
物理分区:区别于逻辑分区,消息实际存储的单元,每个物理分区都会分配到某一台机器指定节点上。
Producer将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
适用场景
消息生产和消费有时间窗口要求,比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
使用方式
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销(阿里云 RocketMQ 提供了任意时刻的定时消息功能)。
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的 level 来的,延迟队列默认是
msg.setDelayTimeLevel(5) 代表延迟一分钟
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的 level 即可。消费消息跟普通的消费消息一致。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,并且重试达到最大次数后,则表明 Consumer在正常情况下无法正确地消费该消息。此时,消息队列 MQ 不会立刻将消息丢弃,而是将这条消息发送到该GroupID对应的特殊队列中。
消息队列MQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-LetterQueue)。
为了防止消息重复消费导致业务处理异常,消息队列MQ的消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。
RocketMQ 分布式消息队列的消息过滤方式有别于其它 MQ 中间件,是可以实现服务端的过滤。
Tag过滤
Consumer端在订阅消息时除了指定Topic,还可以指定TAG(多个 TAG,可以用||分隔)。其中,Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给 Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传 Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag的hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对 tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
SQL92过滤
这种方式的大致做法和Tag过滤方式一样,只是具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter 模块负责的。
如果开启SQL过滤的话,Broker需要开启参数enablePropertyFilter=true,然后服务器重启生效。
RocketMQ事务消息,是指发送消息事件和其他事件需要同时成功或同失败。
RocketMQ 采用两阶段提交的方式实现事务消息,处理流程为:
commitLog
commitLog以物理文件的方式存放,每台Broker上的commitLog 被本机器所有consumeQueue共享。在commitLog中,一个消息的存储长度是不固定的,RocketMQ 采取一些机制,尽量向CommitLog中顺序写,但是随机读。commitlog 文件默认大小为lG,可通过在broker配置文件中设置mapedFileSizeCommitLog属性来改变默认大小。
consumeQueue
consumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个messageQueue都有一个对应的consumeQueue文件。
其构建机制是,当消息到达commitlog文件后,由专门的线程产生消息转发任务,从而构建消息消费队列文件(consumeQueue )与索引文件。
indexFile
index文件夹下存的是索引文件(使用Hash索引机制),用来加快消息查询的速度。RocketMQ专门为消息订阅构建索引文件,来提高根据主题检索消息的速度。
config
config 文件夹中存储着Topic和Consumer等相关信息。topics.json:topic配置属性。
subscriptionGroup.json:消费者组配置信息。
delayOffset.json:延时消息队列拉取进度。
consumerOffset.json:集群消费模式消息消进度。
其他
abort:如果存在abort文件说明Broker非正常闭,该文件默认启动时创建,正常退出之前删除。
checkpoint:文件检测点,存储commitlog、consumeQueue、index 文件最后一次刷盘时间戳。
内存映射文件,是由一个文件到一块内存的映射。文件的数据就是这块区域内存中对应的数据,读写文件中的数据,直接对这块区域的地址操作就可以,减少了内存复制的环节。所以说,内存映射文件比起文件 I/O 操作,效率要高,而且文件越大,体现出来的差距越大。
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写息在通过Producer写入RocketMQ的时候,有两种写入磁盘方式:同步刷盘和异步刷盘。
同步刷盘
在消息追加到内存后,将同步调用 MappedByteBuffer.force()方法进行刷盘,刷盘操作成功后返回消息给发送端。
由于频繁的触发磁盘写动作,会明显降低性能。
异步刷盘
在消息追加到内存(PAGECACHE)后立刻返回消息给发送端。RocketMQ使用个单独的线程按照某个设定的频执行刷盘操作。
通过broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH (异步刷盘)、SYNC_FLUSH 同步刷盘), 默认为异步。
由于RocketMQ操作commitLog、consumeQueue文件是基于内存映射机制,并在启动的时候会加载commitlog、consumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。
删除过程分别执行清理消息存储文件(commitlog)和消息消费队列文件(consumeQueue),两者共用一套文件期机制。
RocketMQ清除过期文件的方法:
如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42 小时(不同版本的默认值不同),通过在 Broker 配置文件中设置fileReservedTime(文件保留时间)来改变过期时间(单位为小时)。
触发文件清除操作的是一个定时任务,默认每10s执行一次。
过期判断
文件过期主要由fileReservedTime来控制。
另外还有其他两个配置参数:
删除条件
另外还有RocketMQ的磁盘配置参数:
RocketMQ的Broker分为Master(主)和 Slave(从)两个角色。为了保证高可用性(HA),Master收到消息后,要把内容同步到Slave上,这样一旦Master宕机Slave仍然可以提供服务。
RocketMQ集群部署
集群部署的方式
多主
多个主节点组成集群。
优点:所有模式中性能最高。
缺点:单个Master宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
使用同步刷盘可以保证消息不丢失,同时Topic相对应的 queue应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该topic的应用造成影响。
多主多从,异步复制模式
在多Master的基础上,每个Master都有至少一个对应的Slave。Master可读可写,Slave只能读,类似于mysql的主备模式。
优点:一般情况下都是Master消费,在Master宕机或超过负载时,消费者可以从Slave读取消息,消息的实时性不受影响,性能几乎和多Master一样。
缺点:使用异步复制的数据同步方式有可能会有消息丢失的问题。
多主多从,同步双写模式
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息响应时间会略长,性能相比异步复制低 10%左右。
对数据要求较高的场景,建议采用同步复制方式、异步刷盘方式,来保存数据热备份和高吞吐量。
多主模式与数据重复
多主模式,生产者发送一条消息,只会写入到一台broker的一个queue中,所以有几台master跟消息会不会重复没有直接关系。
主从复制原理
从服务器在启动的时候主动向主服务器建立TCP长连接,然后获取服务器的commitlog最大偏移,以此偏移向主服务器主动拉取消息,主服务器根据偏移量,与自身commitlog文件的最大偏移进行比较,如果大于从服务器 commitlog偏移,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。
读写分离机制
消息消费者在向Broker发送消息拉取请求时,会根据筛选出来的消息队列,判定是从Master,还是从Slave拉取消息,默认是Master。
Broker 接收到消息消费者拉取请求,在获取本地堆积的消息量后,会计算服务器的消息堆积量是否大于物理内存的一定值,如果是,则标记下次从Slave服务器拉取,计算Slave服务器的Broker Id,并响应给消费者。
消费者在接收到Broker的响应后,会把消息队列与建议下一次拉取节点的 Broker Id 关联起来,并缓存在内存中,以便下次拉取消息时,确定从哪个节点发送请求。