RocketMQ源码阅读-Broker消息接收

发布时间:2024年01月14日


Broker接收 Producer发送的消息。

Broker在RocketMQ中也是一个独立的Model,rocketmq-broker。

Broker的核心类为SendMessageProcessor。
image.png

1. 从单元测试入手

同样从单元测试入手,看Broker接收消息的流程。
SendMessageProcessor的单元测试类为org.apache.rocketmq.broker.processor.SendMessageProcessorTest。
image.png
包含上面这些方法,其中init()方法是启动类,其他是测试流程的方法。
先看init()方法中Broker的启动流程。

2. Broker启动流程

单元测试中的inti()方法为:

@Before
public void init() {
    brokerController.setMessageStore(messageStore);
    when(messageStore.now()).thenReturn(System.currentTimeMillis());
    Channel mockChannel = mock(Channel.class);
    when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
    when(handlerContext.channel()).thenReturn(mockChannel);
    when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
    sendMessageProcessor = new SendMessageProcessor(brokerController);
}

init方法新建一个SendMessageProcessor,并一个传入brokerController,指定一个messageStore:

  • BrokerController: Broker的管理器
  • MessageStore: 消息存储的接口

继续看接收消息的流程SendMessageProcessor#sendMessage。

3. Broker接收消息

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            // 解析请求
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }

            // 发送请求Context。在hook场景下使用
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // hook:处理发送消息前逻辑
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

            RemotingCommand response;
            if (requestHeader.isBatch()) {
                // 处理批量发消息逻辑
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                // 处理发送消息逻辑
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }

            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}

可以看到,此方法负责解析RPC请求,最终是调用SendMessageProcessor#sendMessage方法:

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    // 初始化响应
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    response.setOpaque(request.getOpaque());

    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    // 如果未开始接收消息,抛出系统异常
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    // 消息配置(Topic配置)校验
    response.setCode(-1);
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }

    final byte[] body = request.getBody();

    // 如果队列小于0,从可用队列随机选择
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }

    // 创建MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return response;
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 校验是否不允许发送事务消息
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
        // 添加消息
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }
	// 处理result
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

可以看到,此方法进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了MessageStore#putMessage方法:
MessageStore是接口,其默认实现为DefaultMessageStore:
image.png
DefaultMessageStore#putMessage方法:

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

	// 从节点不允许写入
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
	// store是否允许写入
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
        this.printTimes.set(0);
    }
	// 消息过长
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }
	// 消息附加属性过长
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    long beginTime = this.getSystemClock().now();
	// 添加消息到commitLog
    PutMessageResult result = this.commitLog.putMessage(msg);

    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}

可以看到,首先是检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过CommitLog.putMessage进行存储。

4. Broker接收消息时序图

SendMessageProcessor_processRequest.png

5. 小结

Producer作为客户端发送消息后,Broker作为服务端需要接收消息并存储消息。
本篇分析了broker接收消息的流程:

  • SendMessageProcessor#processRequest是RPC执行接收消息的方法,此方法主要负责解析RPC请求
  • processRequest调用SendMessageProcessor#sendMessage方法,进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了DefaultMessageStore#putMessage方法
  • DefaultMessageStore#putMessage方法检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过commitLog.putMessage进行存储

消息存储是通过CommitLong#putMessage进行的,这个流程在下一篇《RocketMQ源码阅读-Broker消息存储》学习。

文章来源:https://blog.csdn.net/weixin_43155866/article/details/135584745
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。