🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁
topic 是有一堆的 queue,而且分布在不同的 broker 上
并且在消费时,将多个 queue 分配给多个 consumer,每一个 consumer 会分配到一部分的 queue 进行消费
每个 consumer 会获取到 Topic 下包含的 queue 的信息
以及 每个 consumer group 下包含多少的 consumer
,那么 consumer 都使用相同的算法去做一次分配
Consumer 分配队列:
Consumer 端队列的分配是通过 RebalanceService
这个组件实现的,拉取 Topic 的 queue 信息,拉取 consumer group 信息,根据算法分配 queue,确认自己需要拉取哪些 queue 的消息
RebalanceService
这个组件是在 Broker 中的,主要负责实现消息队列的动态负载均衡和自动分配,确保消息队列在消费者组内均匀分配,并在消费者组发生变化时进行动态调整,通过动态负载均衡和自动分配消息队列,保证了消费者组在消费消息时的 高效性和可靠性
那么分配好队列之后,Consumer 就知道自己分配了哪些 queue 了,Consumer 就可以去 Broker 中对应的 queue 进行数据的拉取,这里 Consumer 消息的拉取在 RocketMQ 中有两种实现(DefaultMQPushConsumer、DefaultMQPullConsumer, 但是在底层全部都是通过 pull 拉取消息进行消费的):
RocketMQ 的长轮询:
RocketMQ 中使用了 长轮询
的方式,兼顾了 push 和 pull 两种模式的优点
长轮询:
长轮询本质上也是轮询,服务端在没有数据的时候并不是马上返回数据,而是会先将请求挂起,此时有一个长轮询后台线程每隔 5s 会去检查 queue 中是否有新的消息,如果有则去唤醒客户端请求,否则如果超过 15s 就会判断客户端请求超时
Consumer 去 Broker 中拉取消息的线程只有一个,拉取到消息之后会将消息存放在 ProcessQueue 中,每一个 ConsumeQueue 都会对应一个 ProcessQueue
消息被拉取到会放在 ProcessQueue 中,等待线程池进行 并发消息
,线程池处理消息时,就会调用到我们在创建生产者时注册的监听器中的 consumeMessage
方法,在这里会执行我们自己定义的业务逻辑,之后会返回状态码:SUCCESS 或 RECONSUME_LATER 等等,如果消费成功,线程会去 ProcessQueue 中删除对应的消息,并且会记录 consumer group 对于 queue 的消费进度
,以实通过异步提交到 broker 中去,流程图如下:
Consumer 处理失败时的延迟消费机制:
在 consumer 消费消息失败的时候,线程池会将消费失败的消息发送到 Broker 中,在 Broker 中,对失败的消息进行一个 Topic 的改写为:RETRY_Topic_%
,会根据之前的 Topic 名称进行改写,改写后呢,作为一个 延迟消息
重新写入 Commitlog 和 ConsumeQueue 中,再通过专门处理延迟消息的后台线程监听延迟消息是否到达延迟时间,当时间到达之后,会将改写后的 Topic 再重新改写为原来的 Topic 名称并写入 Commitlog,之后等待被消费者再次消费即可