注意:本文只会针对 局部有序进行实践。
常见做法就是将 order id 进行处理,将 order id 相同的消息发送到 topicB 的同一个 queue,假设我们 topicB 有 2 个 queue,那么我们可以简单的对 id 取余,奇数的发往 queue0,偶数的发往 queue1,消费者按照 queue 去消费时,就能保证 queue0 里面的消息有序消费,queue1 里面的消息有序消费。
程序局部有序如下图设计进行测试。
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
[root@hadoop02 rocketmq-all-5.1.4-bin-release]# sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
create topic to 10.32.36.143:10911 success.
TopicConfig [topicName=orderly, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
[root@hadoop02 rocketmq-all-5.1.4-bin-release]#
注意:spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true
spring:
cloud:
stream:
function:
definition: consumer
rocketmq:
binder:
name-server: 10.32.36.143:9876
bindings:
producer-out-0:
producer:
group: output_1
# 定义messageSelector
messageQueueSelector: orderlyMessageQueueSelector
consumer-in-0:
consumer:
# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
# subscription: 'TagA || TagC || TagD'
orderly: true
bindings:
producer-out-0:
destination: orderly
consumer-in-0:
destination: orderly
group: orderly-consumer
logging:
level:
org.springframework.context.support: debug
上面配置进行补充如下:
spring.cloud.stream.bindings和spring.cloud.stream.rocketmq.bindings 区别
public class SimpleMsg {
private String msg;
public SimpleMsg(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
@Component
public class OrderlyMessageQueueSelector implements MessageQueueSelector {
private static final Logger log = LoggerFactory
.getLogger(OrderlyMessageQueueSelector.class);
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
int index = id % MqApplication.tags.length % mqs.size();
return mqs.get(index);
}
}
@EnableDiscoveryClient
@SpringBootApplication
public class MqApplication {
private static final Logger log = LoggerFactory
.getLogger(MqApplication.class);
public static final String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
public static void main(String[] args) {
SpringApplication.run(MqApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@Bean
public ApplicationRunner producer() throws InterruptedException {
Thread.sleep(6000);
log.info("开始...");
return args -> {
for (int i = 0; i < 50; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i%2);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i%2), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
MessageConst.PROPERTY_TAGS).toString();
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
msg.getHeaders().get(tagHeaderKey).toString());
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
};
}
}
至此,RocketMQ 顺序消息收发实践
就结束了,如有疑问,欢迎评论区留言。