? ? ? ? 如果生产者大量发送消息,消费者消费能力不够,会造成broker消息堆积,需要根据实际场景制定解决方案。
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?
1)全局有序:可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,多个消息队列同时消费是无法绝对保证消息的有序性的。
2)局部有序:可以定义一个特定的字段通过运算取模进行投递特定的队列中,消费者端使用MessageListenerOrderly
处理有序消息。
两种方案:1)一种是在broker端按照Consumer的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到Consumer端,缺点是加重了Broker的负担;2)在comsumer端进行过滤,无用的消息不进行处理
幂等性原则:就是用户对于同一种操作发起的多次请求的结果是一样的,不会因为操作了多次就产生不一样的结果
解决方案:1)在数据库中做好唯一约束,2)通过redis保存每条消息唯一的key,消费重复消息时先去redis判断key是否已存在
1)消费者向borker投递消息时,不是Commit/
Rollback
状态所以还是半消息,
2)当borker接收到半消息时(此时的消息消费者不可见)也并不会提供给消费者;
3)当生产者知道向broker投送消息成功执行本地事务
4)生产者本地事务完成后向borker发送Commit/
Rollback
,borker确认状态后根据状态进行投递或删除
Commit/Rollback
的事务状态消息,从服务端发起一次“回查”Commit
/Rollback
public enum LocalTransactionState {
//本地事务执行成功,给broker发送一个commit的标识
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
//这个状态将会引起回查
UNKNOW,
}
半消息是只生产者向broker发送消息时,还没经过两次确认的消息,此时的状态标记为“不可消费”状态,所以暂时对消费者不可见
需要从Producer,Consumer和Broker三个方面来回答:
消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费
Consumer自身维护了个持久化的offset(对应Message Queue里的min offset),用来标记已经成功消费且已经成功发回Broker的消息下标。如果Consumer消费失败,它会向Broker发回消费失败的状态,发回成功才会更新自己的offset。如果发回给broker时broker挂掉了,Consumer会定时重试,如果Consumer和Broker一起挂掉了,消息还在Broker端存储着,Consumer端的offset也是持久化的,重启之后继续拉取offset之前的消息进行消费。?
RocketMQ的存储与读写是基于JDK NIO的内存映射机制,消息存储时首先将消息追加到内存中。通过配置刷盘策略将内存消息数据保存到磁盘文件中。
RocketMQ提供了两种刷盘策略:同步刷盘和异步刷盘
在消息达到Broker的内存之后,必须刷到commitLog日志文件中才算成功,然后返回Producer数据已经发送成功。
异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中。
优缺点分析
同步刷盘能保证消息抵达broker后不丢失,如果是异步刷盘当producer发来消息后响应成功,另外开启一个线程刷盘时,此时刚好borker出现问题无法正常执行。这是消息可能就无法刷到commitLog中。
异步刷盘吞吐量比同步刷盘高,响应速度快?
在给borker发送消息时,会根据从NameServer获取到的broker集群实例列表,通过轮训的策略进行投递。
集群模式:
首先理解Broker是如何与Consumer进行消息交互的,是Consumer主动向Borker的Consumer queue拉取消息的;而一个queue只能对应一个Consumer实例,而Consumer实例可以同时消费多个queue,处于多对一的关系。那么如果在系统运行过程中我又启动了多个Consumer,这时会有一个重平衡机制,让新启动的Consumer分配到queue。
问题:如果只有3个queue和3个Consumer实例,刚好是一 一对应的,这时新增consumer实例,queue数量不够的话,新增的实例就分配不到queue
广播模式:
广播模式会让一条消息让所有订阅的消费者组的实例都进行消费,会让所有consumer都分到所有的queue
当一条消息无法正常消费的时会被投递到死信队列,比如一条消息初次消费失败或超时,消息队列会自动进行消费重试累计默认16次;达到最大重试次数后,消息队列不会立刻将消息丢弃而是暂时存放到死信队列中。
PULL:拉取模式为消费者主动从broker中拉取消息消费,只要拉取到消息,就会启动消费过程
PUSH:推模式为消费者就是要注册消息的监听器,监听器是要用户自行实现的。当消息达到broker服务器后,消费者端会触发监听器拉取消息进行消费。但是从实际上看还是从broker中拉取消息
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息(offset),当有消息消费后只是当前Consumer的消费进度(offset)更新了。
消息会通过messageStore消息存储管理器将消息储存在broker的commitLog中,而CommitLog结构包含
MessageStoreConfig:消息存储配置对象?
文件目录:storePathRootDir\store下的commitLog、Consume Queue、index等?
?
另外borker持久化文件中还会储存一些其他数据,比如offset、topic等
文件目录:storePathRootDir\store\config下的json文件数据(consumerFilter.json,consumerOffset.json,subscriptionGroup.json,topics.json)
?