rocketmq

发布时间:2024年01月16日

rocketmq 代码

个人做了一个Spring Boot整合RocketMQ的笔记
生产者:同步、异步、单向、批量、延时、顺序、过滤、事务消息示例发送
消费者:集群、广播模式示例,顺序、过滤消息示例
完全可以在工作中直接cv使用。
整合RocketMQ的笔记

消费者订阅组

3

消费者模式

push consumer

优点:实时性强
缺点:客户端消费不过来,会压力过大

pull consumer

一次性拿到很多消息,只有消费者成功消费了一条消息,在mq里面 消费者位点 才移动。

好处: 压力可控
缺点: 实时性不强。

底层都是 pull,push 只是客户端进行了封装了api。

一般情况:上游消息生产的量小或者均速的时候,选择push模式。特殊情况下,电商大促, 选择pull模式。

顺序消息

发送的顺序 和消费的顺序 需要保持一致。

队列的 局部有序全局有序

3
MessageQueueSelector 队列选择器
3

消费者 顺序模式

消费者 顺序模式。
3

消息报错 返回失败 重试

重试
并发模式下 重试 16次。 16次后 还失败,放入死信队列。
顺序消息 失败 会 无限重试,Integer.MAV_VALUE

tag

消息带 tag

消息过滤 tag key

消费者 过滤不同的tag 处理消息。
不能在 一个组下
3
生产者

@Test
public void testTagProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest","tagA", "我是一个带标记的消息".getBytes());
    SendResult send = producer.send(msg);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

c1 消费者
3

c2 c3 消费者
3

什么时候tag 什么时候topic

  1. ·消息类型是否一致 不一致用topic
  2. 业务是否有关联 而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
  3. 消息优先级是否一致 优先级 不同用topic 区分
    4. 消息量级 是否 相当 有些实时性高 不同topic 区分

key

默认给消息 配置messageid做消息的唯一标识。我们也可以给消息携带一个key
用来作为消息的唯一标识

自己封装消息的key
3

重试

生产者重试

3

消费者

重试 时间 和延迟级别一致 10s 30s 1m…
默认重试16次

// 业务报错了, 返回null, reconsume_later  都会重试
return ComsumeConcurrentlyStatus.RECONSUME_LATER;  

死信队列

  1. 如果(并发模式)重试16次都失败了,顺序模式下(Integer.MAX_VALUE)?
    是一个死信 消息,放入死信队列。
    死信队列 topic 名称 :%DLQ%消费者组名称

  2. 自定义重试次数?
    consumer.setMaxReconsumeTimes()// 消息最大重试次数

  3. 消费消费失败的时候? 该如何正确处理?

3

springboot

消费者

1.类上添加注解@Component和@RocketMQMessageListener
@RocketMQMessageListener(topic = “powernode”, consumerGroup = “powernode-group”) topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费

2.实现RocketMQListener接口,注意泛型的使用,可以为具体的类型,如果想拿到消息 的其他参数可以写成MessageExt

顺序消费 消费者 单线程模式

3

tag key

tag :

topicName:tags
3

消费者 select type

3

key

3

mq的消费者消息模式

负载均衡

广播

3

消息不丢失

消息持久化

同步刷盘

异步刷盘

消息堆积问题

一个队列 理论上 30w 消息 长度
差值 大于 10w 需要考虑堆积

什么情况下出现堆积

mq 架构

如何设计mq

  1. mq 存储, 高可用 磁盘存储,顺序读写、零拷贝技术
  2. 消息的丢失 ,多主多从,

消息丢失问题

  1. 生产者 同步发送 机制 发布确认机制
  2. borker 不丢失消息 同步刷盘、异步刷盘 flushDiskType 配置为SYNC_FLUSH 刷盘
  3. 如果broker 是配置的集群,至少将消息 发送到2个以上的节点,再给客户端发送响应

消息重复消费

消息堆积问题

  • 如果是突发问题, 需要临时扩容,增加消费者的数量。通过扩容和降级承担流量,应急问题的处理。
  • 其次排查解决异常问题。

临时写一个程序,将堆积消息写入到另一个主题中,10个队列。
临时用10倍的机器部署consumer,消费消息。

顺序消费

#rocketmq 高级

消息存储

刷盘机制

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
sync_flush
async_flush

高可用性

nameser 集群
broker 高可用性 通过主节点 从节点
消息消费者: 可以从borker 主读 ,也可以从broker 从读

消息的主从复制

同步复制

master slave 数据均写入成功后,才返回success 给生产者。
问题:数据量大,存在写入延迟,降低了系统的吞吐量

异步复制

只要写给master 成功,立即响应给生产者。 master 、slave 之间有通过异步方式
吞吐量高
问题: master 出现了故障,可能出现数据的丢失。

配置

brokerRole=SYNC_MASTER
主节点配置:
SYNC_MASTER 同步
ASYNC_MASTER 异步复制

从节点配置 brokerRole=SLAVE

刷盘 和配置

通常情况下: 应该把Master和SLAVE 的刷盘策略配置成 ASYNC_FLUSH, 保证吞吐量
主从之间配置成SYNC_MASTER 同步的复制方式。 保证消息不丢失

rocketmq 消息重试

无序消息重试

针对于 集群模式有效最大16次,广播模式 无效

如何重试

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 Action.ReconsumeLater (推荐)
  • 返回 Null
  • 抛出异常
文章来源:https://blog.csdn.net/weixin_48155735/article/details/132942090
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。