个人做了一个Spring Boot整合RocketMQ的笔记
生产者:同步、异步、单向、批量、延时、顺序、过滤、事务消息示例发送
消费者:集群、广播模式示例,顺序、过滤消息示例
完全可以在工作中直接cv使用。
整合RocketMQ的笔记
优点:实时性强
缺点:客户端消费不过来,会压力过大
一次性拿到很多消息,只有消费者成功消费了一条消息
,在mq里面 消费者位点 才移动。
好处: 压力可控
缺点: 实时性不强。
底层都是 pull,push 只是客户端进行了封装了api。
一般情况:上游消息生产的量小
或者均速的时候,选择push模式。特殊情况下,电商大促, 选择pull模式。
发送的顺序 和消费的顺序 需要保持一致。
局部有序
、 全局有序
MessageQueueSelector 队列选择器
消费者 顺序模式。
重试
并发模式下 重试 16次
。 16次后 还失败,放入死信队列。
顺序消息 失败 会 无限重试,Integer.MAV_VALUE
消息带 tag
消费者 过滤不同的tag 处理消息。
不能在 一个组下
生产者
@Test
public void testTagProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
Message msg = new Message("TopicTest","tagA", "我是一个带标记的消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}
c1 消费者
c2 c3 消费者
消息类型
是否一致 不一致用topic业务是否有关联
而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
消息优先级是否一致
优先级 不同
用topic 区分. 消息量级 是否 相当
。有些实时性高
不同topic 区分默认给消息 配置messageid做消息的唯一标识。我们也可以给消息携带一个key
用来作为消息的唯一标识
自己封装消息的key
重试 时间 和延迟级别一致 10s 30s 1m…
默认重试16次
// 业务报错了, 返回null, reconsume_later 都会重试
return ComsumeConcurrentlyStatus.RECONSUME_LATER;
如果(并发模式)重试16次都失败了,顺序模式下(Integer.MAX_VALUE)?
是一个死信 消息,放入死信队列。
死信队列 topic 名称 :%DLQ%消费者组名称
自定义重试次数?
consumer.setMaxReconsumeTimes()// 消息最大重试次数
消费消费失败的时候? 该如何正确处理?
1.类上添加注解@Component和@RocketMQMessageListener
@RocketMQMessageListener(topic = “powernode”, consumerGroup = “powernode-group”) topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
2.实现RocketMQListener接口,注意泛型的使用,可以为具体的类型
,如果想拿到消息 的其他参数可以写成MessageExt
topicName:tags
一个队列 理论上 30w 消息 长度
差值 大于 10w 需要考虑堆积
mq 存储
, 高可用 磁盘存储,顺序读写、零拷贝技术消息的丢失
,多主多从,同步发送 机制
发布确认机制同步刷盘、异步刷盘
flushDiskType
配置为SYNC_FLUSH
刷盘临时写一个程序,将堆积消息写入到另一个主题中,10个队列。
临时用10倍的机器部署consumer,消费消息。
#rocketmq 高级
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
sync_flush
async_flush
nameser 集群
broker 高可用性 通过主节点 从节点
消息消费者: 可以从borker 主读 ,也可以从broker 从读
master slave 数据均写入成功后,才返回success 给生产者。
问题:数据量大,存在写入延迟,降低了系统的吞吐量
只要写给master 成功,立即响应给生产者。 master 、slave 之间有通过异步方式
吞吐量高
问题: master 出现了故障,可能出现数据的丢失。
brokerRole=SYNC_MASTER
主节点配置:
SYNC_MASTER 同步
ASYNC_MASTER 异步复制
从节点配置 brokerRole=SLAVE
通常情况下: 应该把Master和SLAVE 的刷盘策略配置成 ASYNC_FLUSH
, 保证吞吐量
主从之间配置成SYNC_MASTER 同步的复制方式。 保证消息不丢失
针对于 集群模式有效
最大16次,广播模式 无效
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):