Broker在RocketMQ中也是一个独立的Model,rocketmq-broker。
Broker的核心类为SendMessageProcessor。
同样从单元测试入手,看Broker接收消息的流程。
SendMessageProcessor的单元测试类为org.apache.rocketmq.broker.processor.SendMessageProcessorTest。
包含上面这些方法,其中init()方法是启动类,其他是测试流程的方法。
先看init()方法中Broker的启动流程。
单元测试中的inti()方法为:
@Before
public void init() {
brokerController.setMessageStore(messageStore);
when(messageStore.now()).thenReturn(System.currentTimeMillis());
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
sendMessageProcessor = new SendMessageProcessor(brokerController);
}
init方法新建一个SendMessageProcessor,并一个传入brokerController,指定一个messageStore:
继续看接收消息的流程SendMessageProcessor#sendMessage。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
// 解析请求
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
// 发送请求Context。在hook场景下使用
mqtraceContext = buildMsgContext(ctx, requestHeader);
// hook:处理发送消息前逻辑
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
// 处理批量发消息逻辑
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 处理发送消息逻辑
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
可以看到,此方法负责解析RPC请求,最终是调用SendMessageProcessor#sendMessage方法:
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 初始化响应
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command, {}", request);
// 如果未开始接收消息,抛出系统异常
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
// 消息配置(Topic配置)校验
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
// 如果队列小于0,从可用队列随机选择
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
// 创建MessageExtBrokerInner
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 校验是否不允许发送事务消息
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 添加消息
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// 处理result
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
可以看到,此方法进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了MessageStore#putMessage方法:
MessageStore是接口,其默认实现为DefaultMessageStore:
DefaultMessageStore#putMessage方法:
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 从节点不允许写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// store是否允许写入
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
// 消息过长
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 消息附加属性过长
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
// 添加消息到commitLog
PutMessageResult result = this.commitLog.putMessage(msg);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
可以看到,首先是检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过CommitLog.putMessage进行存储。
Producer作为客户端发送消息后,Broker作为服务端需要接收消息并存储消息。
本篇分析了broker接收消息的流程:
消息存储是通过CommitLong#putMessage进行的,这个流程在下一篇《RocketMQ源码阅读-Broker消息存储》学习。