以下是重平衡代码:
private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
boolean balanced = true;
switch (messageModel) {
//广播模式不需要重平衡
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);
}
balanced = mqSet.equals(getWorkingMessageQueue(topic));
} else {
this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
//内存中拿出所有queue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//GET_CONSUMER_LIST_BY_GROUP这个请求去ns拿出所有的客户端
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//将两个list进行排序
Collections.sort(mqAll);
Collections.sort(cidAll);
//重平衡策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//策略进行重平衡
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
return false;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//对比queue是否有变化,并且更新到processQueueTable中
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));
}
break;
}
default:
break;
}
return balanced;
}