RocketMQ的顺序消息
概述
顺序消息是rocketMQ的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理,相比于其他消息类型,顺序消息在发送,存储和投递的处理过程中,更多强调多条消息的先后顺序关系,当我们操作订单的时候,通过MessageSelector,将orderId相同的消息,都转发到同?个MessageQueue中。
RocketMq使用局部顺序一致性机制,来保证单个队列中消息有序,其实来说消息是否有序都是通过消费组来判断和识别的,发送顺序消息时需要为每条消息设置归属的消费组,相同的消息组的多条消息之间遵循先进先出的顺序原则,但是不同消费者组之间就不能够保证了。
总的来说需要保证消息有序的话就必须将一系列顺序消息放到同一队列当中,然后由消费者逐一消费。
消息的有序包括:全局有序和局部有序(全局有序就是说全局都使用一个queue而局部有序是指多个queue进行并行消费)
全局有序
全局有序就是将所有的消息都放到同一的queue当中,这样肯定是消费有序的。
全局有序就是将所有的消息都放到同一的queue当中,这样肯定是消费有序的。
// 生产者
public class Producer {
public static void main(String[] args) throws MQClientException {
// 创建一个消息生产者
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("message_sequence_product_group");
// 指定nameserver地址
defaultMQProducer.setNamesrvAddr("8.140.182.14:9876");
// 启动生产者
defaultMQProducer.start();
for (int i = 0; i < 10; i++) {
try {
// 构建消息体(指定主题Topic、Tag和消息体)
Message message = new Message("TopicSequence", "TagASequence", ("this" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
/**
* 发送消息获得返回值
* 第一个参数为消息对象
* 第二个参数为消息队列选择器MessageQueueSelector
* 第三个参数为进入队列的下标
*/
SendResult send = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
return list.get((Integer) o);
}
}, 1);
// 打印发送结果
System.out.printf("%s%n", send);
} catch (Exception e) {
e.printStackTrace();
}
}
// 消息发送完成,停止消息生产服务
defaultMQProducer.shutdown();
}
}
// 消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 构建消息消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("message_sequence_consumer_group");
// 设置nameserver地址
defaultMQPushConsumer.setNamesrvAddr("8.140.182.14:9876");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅主题,主题需要和生产者的topic一致
defaultMQPushConsumer.subscribe("TopicSequence", "*");
/**
* 顺序消费同一队列的消息 实现MessageListenerOrderly中的consumeMessage方法
*/
defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly) (list, consumeConcurrentlyContext) -> {
list.forEach(messageExt -> {
try {
System.out.println("收到消息: queueId:" + messageExt.getQueueId() + " 消息内容:" + new String(messageExt.getBody(),
RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
// 标记该消息被成功消费
return ConsumeOrderlyStatus.SUCCESS;
});
// 启动消费者服务
defaultMQPushConsumer.start();
}
}
局部有序
局部有序是将某一系列的消息存放在同一个queue,我们只需保证在同一个queue里面的消息消费是有序的,比如存在一个订单业务,这样我们就可以使用订单的业务id来进行相关的操作。
// 生产端
public class AllProducer {
public static void main(String[] args) throws MQClientException {
// 创建一个消息生产者
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("message_sequence_product_group");
// 指定nameserver地址
defaultMQProducer.setNamesrvAddr("8.140.182.14:9876");
// 启动生产者
defaultMQProducer.start();
// 模拟订单
List<OrderDemo> orderList = getOrderList();
for (OrderDemo orderDemo : orderList) {
try {
// 构建消息体(指定主题Topic、Tag和消息体)
Message message = new Message("TopicSequence", "TagASequence", ("this" + orderDemo.getOrderId()).getBytes(RemotingHelper.DEFAULT_CHARSET));
/**
* 发送消息获得返回值
* 第一个参数为消息对象
* 第二个参数为消息队列选择器MessageQueueSelector
* 第三个参数为进入队列的下标
*/
SendResult send = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
long i1 = Long.parseLong((String) o) % list.size();
return list.get((int) i1);
}
// 将这里的第三个参数会传递到select中的第三个参数中
}, orderDemo.getOrderId());
// 打印发送结果
System.out.printf("%s%n", send);
} catch (Exception e) {
e.printStackTrace();
}
}
// 消息发送完成,停止消息生产服务
defaultMQProducer.shutdown();
}
}
// 消费端与前面类似
注意事项:
01
生产者只有将一批有顺序要求的信息,放到同一个Queue中,Broker才能保证这一批消息有序。
02
局部有序对于业务更加契合,如果全局有序那就只能保留一个Queue,这样性能显然非常低。
03
消费者端只能用同步的方式处理消息,不能使用异步更不能使用批量处理。
04
消费者端应该限制有限次数的重试,如果某条消息一直重试失败,消费者端就会跳过然后导致消息混乱。
05
生产者尽可能将有序的消息打散到不同的Queue,避免过于集中导致数据热点竞争。
06
消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。
源码分析
顺序消息,保证两个点一个为Producer端发送消息顺序,另一个保证Consumer端消费消息顺序。
Producer端顺序发送
需要保证Producer端消息的有序性唯一要求就是把消息发送到指定的路由分区里面,比如:在商城系统中,我们需要保证订单id相同的消息有序地放到指定的分区即可。
(具体代码看上面案例)
Consumer端消费顺序
启动消费者消息:
在Consumer启动时会去判断监听类型(主要类型有两种一个并发消费,一种顺序消费)
//Consumer中自行指定的回调函数。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
// MessageListenerOrderly服务
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
// MessageListenerConcurrently服务
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 启动
this.consumeMessageService.start();
当消息服务为顺序消息时只对集群消息进行加锁操作,广播消息不管,而并发消息时指定定时去清除过期消息即可,不需要加锁。
ConsPullCallBack的回调:
在Consumer端发送拉取消息请求后,Broker进行处理,当Broker处理成功时就会去执行PullCallBack回调的onSuccess方法,异常则调用onException方法。
// 未拉取到消息
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 将拉取请求放入队列里面然后再重试
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交拉取消息的请求
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
在提交消息的时候submitConsumeRequest方法,顺序消息的实现类为
ConsumeMessageOrderlyService。
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
顺序消息的消费也是通过线程池进行异步消费,那这样该如何去保证消息的顺序消费呢?
consumeExecutor线程池执行submit的方法而ConsumeRequest为一个线程类,当执行方法时就会去执行ConsumeRequest的run方法。
public void run() {
// 队列被删除,直接返回
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 顺序消费锁定队列。
// 获取消息队列对象锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
// 对象消息队列加锁,防止其他线程并发消费该队列
synchronized (objLock) {
// 如果是广播模式或者消息队列加锁成功并且锁未过期
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
// 消费消息
for (boolean continueConsume = true; continueConsume; ) {
// processQueue被删除,跳出循环
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 集群消息并且未加锁,延迟加锁跳出循环
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 集群消息并且锁过期,延迟加锁跳出循环
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 执行时间大于最大消费时间,延迟重新消费
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 获取最大批量数量
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 获取消息
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// 添加消费锁
this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 消费消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
// 释放锁
this.processQueue.getConsumeLock().unlock();
}
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
获取消费队列的锁对象,通过ConcurrentMap来实现锁的复用,然后对其synchronized(加锁操作),保证一个队列在消费同时防止其它队列来操作。
如果是广播模式或者消息队列加锁成功并且锁未过期,开始进行消息的消费,判断消息为集群消息并且processQueue未加锁或者已过期,延迟加锁跳出循环。
如果消费时间大于最大消费时间则延迟重新消费,接下来获取批量最大值数量的消息,添加消费锁开始消费消息。
整个生产过程只需要保证同业务的消息存放在同一队列即可,而整个消费过程中由于使用多线程来操作,所以需要使用三把锁来保证消息的顺序消费。
第一把锁的是消息队列,这样能保证在集群模式下在某一时刻一个消息队列只能被一个消费者进行消费。
第二把锁的是要处理的消息对象,由于消费者在处理拉取消息时是开启多线程进行处理的,这时通过获取ConcurrentMap中锁参数来获取消息队列对应的锁,能够有效的处理多线程之间的竞争。
第三把锁的是消费锁,由于重分配的情况存在,不能把正在被消费的对象进行重分配,这样有效防止重复消费。