一般情况下,只会使用到普通顺序消息,不必保证严格有序。
官方文档中说明目前已知只有一种情况会要求严格顺序,数据库 binlog 同步。
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
String body = "Hello RocketMQ " + i;
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
上面例子中,创建了一个Producer,并且发送消息时,创建了一个选择器,选择其中使用id % mqs.size() 进行消息队列的选择。传递了orderId作为参数,那么相同的orderId能够分配到同一个消息队列中。保证了消息的顺序。
看一下 MessageQueueSelector 接口:
public interface MessageQueueSelector {
/**
* 选择消息队列
*
* @param mqs 消息队列
* @param msg 消息
* @param arg 参数
* @return 消息队列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
发送消息最终会调用到DefaultMQProducerImpl#sendSelectImpl:
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
// 选择消息
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
// 发送消息
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
可以看到代码中选择消息和发送消息的位置。
Consumer能够实现严格顺序消息。
通过三把锁来实现严格顺序消费。
集群模式下,Consumer 更新属于自己的消息队列时,会向 Broker 锁定该消息队列(广播模式下不需要)。如果锁定失败,则更新失败,即该消息队列不属于自己,不能进行消费。
相关源代码如下RebalanceImpl#updateProcessQueueTableInRebalance:
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
// 顺序消息锁定消息队列
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
return changed;
}
代码第43行,可以看到,当是顺序消息时,需要锁定消息队列。
锁定操作调用方法为RebalanceImpl#lock:
public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
// 请求Broker获得指定消息队列的分布式锁
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
// 设置消息处理队列锁定成功。锁定消息队列成功,可能本地没有消息处理队列,设置锁定成功会在lockAll()方法。
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
可以看到,严格顺序消息,rebalance更新队列时,会获取队列锁,如果锁定失败,新增消息处理队列失败。
Broker 消息队列锁会过期,默认配置 30s。因此,Consumer 需要不断向 Broker 刷新该锁过期时间,默认配置 20s 刷新一次。源码ConsumeMessageOrderlyService#start:
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
集群模式下,Consumer 移除自己的消息队列时,会向 Broker 解锁该消息队列(广播模式下不需要)。
源码RebalancePushImpl#removeUnnecessaryMessageQueue:
该方法的作用是移除不需要的队列相关的信息。
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
// 持久化队列的消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
// 移除
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
// 集群模式下,顺序消费移除时,解锁对队列的锁定
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
// 延迟解锁
return this.unlockDelay(mq, pq);
} finally {
pq.getLockConsume().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
}
} catch (Exception e) {
log.error("removeUnnecessaryMessageQueue Exception", e);
}
return false;
}
return true;
}
获取消息队列消费锁,避免和消息队列消费冲突。如果获取锁失败,则移除消息队列失败,等待下次重新分配消费队列时,再进行移除。如果未获得锁而进行移除,则可能出现另外的 Consumer 和当前 Consumer 同时消费该消息队列,导致消息无法严格顺序消费。
会调用延迟解锁方法RebalancePushImpl#unlockDelay:
private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
if (pq.hasTempMessage()) {
log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
@Override
public void run() {
log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
RebalancePushImpl.this.unlock(mq, true);
}
}, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
} else {
// 消息处理队列不存在,直接解锁
this.unlock(mq, true);
}
return true;
}
解锁 Broker 消息队列锁。如果消息处理队列存在剩余消息,则延迟解锁 Broker 消息队列锁。
源码DefaultMQPushConsumerImpl类中300行,创建了一个PullCallback,作为拉取消息成功的回调,325行调用ConsumeMessageOrderlyService(严格顺序消息处理类)的submitConsumeRequest方法
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
方法创建了一个ConsumeRequest提交给了线程池。这个ConsumeRequest在下面进行分析。
消费消息的流程如下图(来源https://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/?github&1601):
接着看ConsumeRequest的分析。
ConsumeRequest是ConsumeMessageOrderlyService的内部类:
类源码如下:
class ConsumeRequest implements Runnable {
// 消息处理队列
private final ProcessQueue processQueue;
// 消息队列
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 获得 Consumer 消息队列锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// (广播模式) 或者 (集群模式 && Broker消息队列锁有效)
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 消息队列分布式锁未锁定,提交延迟获得锁并消费请求
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 消息队列分布式锁已经过期,提交延迟获得锁并消费请求
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
// 执行消费
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// 获取消息处理队列锁
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
// 释放消息处理队列锁
this.processQueue.getLockConsume().unlock();
}
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
}
源码分析详细见上面代码片段中的注释。主要有以下流程:
上一节源码第150行,执行处理消费结果的逻辑,调用的方法为ConsumeMessageOrderlyService#processConsumeResult方法:
// 处理消费结果,并返回是否继续消费
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
// 自动提交的消息
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
// 消息消费成功,提交到消息处理队列
commitOffset = consumeRequest.getProcessQueue().commit();
// 统计
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
// 统计
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
// 计算是否暂时挂起消费N毫秒,默认10ms
if (checkReconsumeTimes(msgs)) {
// 设置消息重新消费
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
// 提交延时消费请求
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
// 统计
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
// 提交消息已消费成功到消息处理队列
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
// 设置消息重新消费
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
// 统计
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
// 计算是否暂时挂起消费N毫秒,默认:10ms
if (checkReconsumeTimes(msgs)) {
// 设置消息重新消费
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
// 提交延迟消费请求
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}
// 消息处理队列未dropped,提交有效消费进度
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
private int getMaxReconsumeTimes() {
// default reconsume times: Integer.MAX_VALUE
if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
return Integer.MAX_VALUE;
} else {
return this.defaultMQPushConsumer.getMaxReconsumeTimes();
}
}
// 计算是否暂时挂起消费N毫秒,不暂停条件:存在消息都超过最大消费次数并且都发回broker成功,返回是否要暂停
private boolean checkReconsumeTimes(List<MessageExt> msgs) {
boolean suspend = false;
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
if (!sendMessageBack(msg)) {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
} else {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
}
}
return suspend;
}
// 消息发回Broker,对应的队列时死信队列
public boolean sendMessageBack(final MessageExt msg) {
try {
// 超出最大重新消耗次数(默认 :16次),然后发送到死信队列。
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
此方法根据不同的消费结果状态执行不同的逻辑,消费结果状态共有四种:
public enum ConsumeOrderlyStatus {
/**
* 消费成功但不提交
*/
SUCCESS,
/**
* 消费失败,消费回滚(已过期,仅适用于binlog同步)
*/
@Deprecated
ROLLBACK,
/**
* 消费成功提交并且提交(已过期,仅适用于binlog同步)
*/
@Deprecated
COMMIT,
/**
* 消费失败,挂起一会,稍后继续消费
*/
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
四种状态中ROLLBACK和COMMIT仅用于binlog同步,所以已经标注了@Deprecated。
普通顺序消息,也就是并发消费时,消费失败,Consume会将消费失败的消息发回Broker,等下次拉取再消费。
严格顺序消息,为保证顺序,只能挂起队列(上面代码的第99行函数),延迟一会再次消费。
如果失败次数达到上限(默认 :16次),Consumer会将此消息发回Broker的死信队列。
类ProcessQueue的签名是:队列消费快照。
用来存储消息队列的消费进度,也叫作消息处理队列。
其中的关键属性和方法为:
// 消息银蛇,key为消息队列的位置
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
/**
* msgTreeMap 的子集,仅在有序使用时使用
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
// 回滚消费中的消息
public void rollback() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// consumingMsgOrderlyTreeMap 子集中的消息全部放回msgTreeMap
this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
// 并清空 consumingMsgOrderlyTreeMap
this.consumingMsgOrderlyTreeMap.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("rollback exception", e);
}
}
// 提交消费中的消息为消费成功,返回消费进度
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// 消费进度
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
this.consumingMsgOrderlyTreeMap.clear();
// 返回消费进度,偏移量+1
if (offset != null) {
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
// 指定消息重新消费
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
}
}
// 获取前batchSize条消息
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
// 同时put进consumingMsgOrderlyTreeMap子集中,代表正在消费的消息
consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
if (result.isEmpty()) {
consuming = false;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
ProcessQueue是消息处理队列,记录消费的进度,封装对于消息的流程控制方法。
本篇分析了RocketMQ对于普通顺序消息和严格顺序消息的消费流程。