消息中间件之RocketMQ(二)

发布时间:2024年01月24日

RocketMQ支持的消息类型

了解之前,首先要熟悉RocketMQ中的组件架构设计
在这里插入图片描述

1.顺序消息

  • 将同一个订单(即具有相同的orderId)的消息按状态先后顺序消费的,所以消息生产者调用send方法发送时需要传入MessageQueueSelector接口的,实现类,将orderId相同的消息放入同一个MessageQueue中,比如对orderId进行取余,消费端还需要实现MessageListenerOrderly接口用于消费,有序的消息,MessageListenerConcurrently接口消费的消息是无序的
  • 顺序消费的原理是确保将消息投递到同一个队列中,在队列内部RocketMQ保证先进先出,而同一个队列会被投递到同一个消费者实例,再由消费者拉取数据进行消费。在消费者内部会维护本地队列锁,以保证当前只有一个线程能够进行消费,所拉到的消息先被存入消息,处理队列中,然后再从消息处理队列中顺序获取消息用MessageListenerOrderly进行消费(这也是在顺序消费时监听消息要实现MessageListener接口)的原因
  • 消费者端的顺序消费,需要有个前提,那就是保证Producer、Broker要保证有序,缺一不可

1.1 Producer

在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的.
而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MesageQueue先进先出的特性
保证这一组消息有序

1.2 Broker

Broker中一个队列内的消息是可以保证有序的

1.3 Consumer

仍然是乱序的,消费者端要保证消息是有序的,就需要按队列一个一个地来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给Consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过,锁队列的方式保证消息是一个一个队列来取的,MessageListenerConcurrently这个消息监听器不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条),因此也无法保证消息有序

2. 延时消息

延时消息实现的效果就是再调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去,这是RocketMQ特有的一个功能,延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m…2h,而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改

3.批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去,这样的好处是可以减少网络IO,提升吞吐量,官网注释,如果批量消息大于1MB就不要使用一个批次发送,而要拆分成多个批次消息发送,也就是说,一个批次消息的大小不要超过1MB,实际使用时,这个1MB的限制可以稍微扩大点,实际的最大的限制是大约4MB.但是使用批量消息时,这个消息长度确实是必须考虑的一个问题,而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK,而且不能是延迟消息、事务消息等

4.过滤消息

大多数情况下,可以使用Message的Tag属性来简单快速地过滤信息,主要时看消息消费者
tag是RocketMQ中特有的一个消息属性,RocketMQ的最佳实践中就建议,使用RocketMQ时,
一个应用可以就用一个Topic,而应用中的不同业务就用tag区分,但是,这种方式有一个很大的限制,
就是一个消息只能有一个tag,在一些比较复杂的场景就有点不足了,可以使用SQL表达式来对消息进行过滤,SQL92语法

5.事务消息

请见消息中间件之RocketMQ事务消息流程,这里不再重复赘述

系统参数调优相关

1.JVM参数

1.配置RocketMQ的JVM内存大小
runserver.sh需要定制nameserver的内存大小,
runbroker.sh中需要定制broker的内存大小
这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中还需要根据服务器的实际情况进行调整
以runbroker.sh中对G1GC的配置举例,在runbroker.sh的关键配置
-XX:+UseG1GC(使用G1垃圾回收器)
-XX:G1HeapRegion(将G1的region块大小设为16M)
-XX:G1ReservePercent(在G1的老年代中预留25%空闲内存,这个默认值是10%)
-XX:InitiaatingHeapOccupancyPercent=30(当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,
默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,
一次垃圾回收时间太长的问题)

2.Linux内核参数定制。

在部署RocketMQ的时候,还需要对Linux内核参数进行一定的定制

  • ulimit,需要进行大量的网络通信和磁盘IO
  • vm.extra_freee_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存,RocketMQ使用此参数来避免内存分配中的长延迟(与具体内核版本相关)
  • vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙地将系统破坏,并且系统在高负载下容易出现死锁
  • vm.max_map_count,限制一个进程可能具有的最大内存映射数,RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将此参数设置较大的值
  • vm.swappiness,定义内核交换内存页面的积极程度,较高的值会增加攻击性,较低的值会减少交换量,建议将值设置为10来避免交换延迟
  • vm.descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符,建议设置文件描述符的值尽可能地调大 如:655350,CentOS7中的配置文件都在/proc/sys/vm目录下,RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,可以根据情况进行调整
    在这里插入图片描述
文章来源:https://blog.csdn.net/Cover_sky/article/details/135832496
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。