顺序消费分为两种:
全局有序:适用于并发度不大,并且对消息要求严格一致性的场景下
通过创建一个 topic,并且该 topic 下只有一个队列,那么生产者向着一个队列中发消息,消费者也在这一个队列中消费消息,来保证消息的有序性
局部有序:适用于对性能要求比较高的场景,在设计层面将需要保证有序的消息放在 Topic 下的同一个队列即可保证有序
要保证全局有序的话,我们先通过上边启动的 Dashboard 项目,创建一个只有一个队列的 Topic
将 写队列和读队列
都设置为 1 个,perm 设置为6(perm,2:只写; 4-只读; 6-读写;)
全局有序流程图如下:
首先消费者主启动类如下:
@SpringBootApplication
@EnableBinding({CustomSink.class })
public class OrderlyConsumerApplication {
@Value("${server.port}")
private int port;
public static void main(String[] args) {
SpringApplication.run(OrderlyConsumerApplication.class, args);
System.out.println("【【【【【 OrderlyConsumerApplication 启动成功!!! 】】】】】");
}
// 定义两个通道,input 接收全局有序消息,input2 接收局部有序消息
@StreamListener("input")
public void receiveInput(String receiveMsg) {
System.out.println(port + " port, input receive: " + receiveMsg);
}
@StreamListener("input2")
public void receiveInput2(String receiveMsg) {
System.out.println(port + " port, input2 receive: " + receiveMsg);
}
}
自定义 CustomSink 如下:
public interface CustomSink extends Sink {
/**
* Input channel name.
*/
String INPUT2 = "input2";
/**
* @return input channel.
*/
@Input(CustomSink.INPUT2)
SubscribableChannel input2();
}
配置类 application.properties
如下:
spring.application.name=mq_orderly_consumer
server.port=9530
# configure the nameserver of rocketmq
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.rocketmq.binder.group=mq_producer_group
# configure the input binding named input
spring.cloud.stream.bindings.input.destination=Global-Orderly-Topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=Global-Orderly-Topic-group
spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true
# configure the input binding named input
spring.cloud.stream.bindings.input2.destination=Partly-Orderly-Topic
spring.cloud.stream.bindings.input2.content-type=application/json
spring.cloud.stream.bindings.input2.group=Partly-Orderly-Topic-group
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=true
全局有序生产者代码如下:
public class GlobalProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(
"producer_group",
true);
producer.setNamesrvAddr("218.95.37.160:9876");
producer.start();
for (int i = 0; i < 12; i++) {
Message msg = new Message(
"Global-Orderly-Topic",
"Global_Orderly_Tag",
("( " + i + " )message from GlobalProducer").getBytes());
msg.setKeys("Global_Orderly_Tag");
producer.send(msg);
}
System.out.println("Send Finished.");
}
}
先启动消费者,再启动生产者,即可看到在消费者端,消息被有序消费
局部有序的话,我们将需要保证有序的消息放在同一个 Topic 下的队列即可保证有序,这里设计的让 OrderId 相同的消息放在同一个队列中发送,流程图如下:
在局部有序中,消费者依然使用全局有序中的消费者,局部生产者代码如下:
public class PartlyProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(
"producer_group",
true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
/**
* orderId = 1 的消息,需要按照 step 的顺序进行消费
* orderId = 2 的消息,需要按照 step 的顺序进行消费
*/
List<Order> list = new ArrayList<>();
for (int i = 1; i <= 3; i ++) {
Order order = new Order();
order.orderId = 1;
order.step = i;
list.add(order);
}
for (int i = 5; i <= 8; i ++) {
Order order = new Order();
order.orderId = 2;
order.step = i;
list.add(order);
}
System.out.println(list);
int size = list.size();
for (int i = 0; i < size; i++) {
Order order = list.get(i);
Message msg = new Message(
"Partly-Orderly-Topic",
"Partly_Orderly_Tag",
(order.toString()).getBytes());
msg.setKeys("Partly_Orderly_Tag");
/**
* 这里发送消息的时候,根据 orderId 来选择对应发送的队列
*/
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int orderId = (int)arg;
int idx = orderId % mqs.size();
return mqs.get(idx);
}
}, order.orderId);
}
System.out.println("Send Finished.");
}
public static class Order {
int orderId;
int step;
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", step=" + step +
'}';
}
}
}