RocketMQ 源码解析:生产者投递消息 DefaultMQProducer#send(一)

发布时间:2024年01月18日

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
? 有趣的事实:音乐、跑步、电影、游戏

目录

前言

RocketMQ 专栏篇:

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

保护数据完整性:探索 RocketMQ 分布式事务消息的力量

RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

RocketMQ MQClientInstance、生产者实例启动源码分析

RocketMQ 投递消息方式以及消息体结构分析:Message、MessageQueueSelector

在这里插入图片描述
在 send 方法中分为三种方式单向、同步、异步,而在异步方式投递消息时,采用的是异步线程池的方式去进行处理的,在初始化 DefaultMQProducerImpl 实例时,会构造一个默认的线程池,核心最大线程数为 CPU 核数,当然如果自定义了线程池就会采用自定义的线程池进行处理,如下:

// 初始化默认异步线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
  Runtime.getRuntime().availableProcessors(),
  Runtime.getRuntime().availableProcessors(),
  1000 * 60,
  TimeUnit.MILLISECONDS,
  this.asyncSenderThreadPoolQueue,
  new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
    }
});

// 选择自定义线程池或默认的线程池
public ExecutorService getAsyncSenderExecutor() {
  return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor;
}

Send 方法还提供了更多选项,支持自定义选择的 MessageQueue 以及 MessageQueueSelector

MessageQueue:选择好 Topic 下指定的一个 MessageQueue 进行消息投递,而普通模式下是轮询或随机选择一个可用的 MessageQueue

MessageQueueSelector:通过自定义的 MessageSelector 按自定义算法将业务上需要按顺序执行的消息分配到同一个 MessageQueue 下进行存储

同时还提供 request 方法可以保证 RPC 通信方式,request-response 之间通过阻塞的方式等待消息投递以及消息处理成功

围绕在生产者、消费者两侧处理完成以后才返回.

Normal Message Send

当前消息属于普通消息时,比如采用同步、异步、单向

同步:send(Message msg)

异步:send(Message msg, SendCallback sendCallback)

单向:sendOneway(Message msg)

SYNC

/**
 * DEFAULT ASYNC -------------------------------------------------------
 */
public void send(Message msg,
                 SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
  send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}

同步方式:发送消息的超时时间 sendMsgTimeout,默认为 3 秒钟,支持自定义

ASYNC

public void send(final Message msg, final SendCallback sendCallback, final long timeout)
  throws MQClientException, RemotingException, InterruptedException {
  final long beginStartTime = System.currentTimeMillis();
  ExecutorService executor = this.getAsyncSenderExecutor();
  try {
    executor.submit(() -> {
      long costTime = System.currentTimeMillis() - beginStartTime;
      if (timeout > costTime) {
        try {
          sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
        } catch (Exception e) {
          sendCallback.onException(e);
        }
      } else {
        sendCallback.onException(
          new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
      }
    });
  } catch (RejectedExecutionException e) {
    throw new MQClientException("executor rejected ", e);
  }
}

异步方式:要自定义一个 SendCallback 回调函数来处理消息投递成功或失败时要做的处理,在异步情况下,会对消息进行重试两次.

ONEWAY

/**
 * DEFAULT ONEWAY -------------------------------------------------------
 */
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
  try {
    this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
  } catch (MQBrokerException e) {
    throw new MQClientException("unknown exception", e);
  }
}

单向方式:无返回值无回调,无论消息是否投递成功,不作任何处理.

sendDefaultImpl

从以上三种方式发送时,都同样调用了 sendDefaultImpl 方法,该方法就是如何处理普通消息处理的相关源码:

private SendResult sendDefaultImpl(
  Message msg,
  final CommunicationMode communicationMode,
  final SendCallback sendCallback,
  final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  // 检查生产者服务是否正常启动
  this.makeSureStateOK();
  // 校验 Topic 是否符合命名规则、Message 大小是否符合条件
  Validators.checkMessage(msg, this.defaultMQProducer);
  final long invokeID = random.nextLong();
  long beginTimestampFirst = System.currentTimeMillis();
  long beginTimestampPrev = beginTimestampFirst;
  long endTimestamp = beginTimestampFirst;
  // 优先从本地缓存中获取 Topic 元数据信息,不存在时在从 nameserver 中进行拉取
  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  if (topicPublishInfo != null && topicPublishInfo.ok()) {
    boolean callTimeout = false;
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
      String lastBrokerName = null == mq ? null : mq.getBrokerName();
      // 从 Topic 中选择好一个队列
      MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
      if (mqSelected != null) {
        mq = mqSelected;
        brokersSent[times] = mq.getBrokerName();
        try {
          beginTimestampPrev = System.currentTimeMillis();
          if (times > 0) {
            //Reset topic with namespace during resend.
            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
          }
          // 计算花费时间,是否调用超时
          long costTime = beginTimestampPrev - beginTimestampFirst;
          if (timeout < costTime) {
            callTimeout = true;
            break;
          }
          // 发起真正的请求
          sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
          switch (communicationMode) {
            case ASYNC:
              return null;
            case ONEWAY:
              return null;
            case SYNC:
              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                // MQ 提供的容错机制,针对 sendLatencyFaultEnable 参数是否开启故障转移的功能
                // 当该参数未开启配置时,RetryAnotherBroker 参数是无意义的.
                // Producer:retryAnotherBrokerWhenNotStoreOK -> true、Broker:sendLatencyFaultEnable -> true  = OK
                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                  continue;
                }
              }
              return sendResult;
            default:
              break;
          }
        } catch (RemotingException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
          log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());
          exception = e;
          continue;
        } catch (MQClientException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
          log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());
          exception = e;
          continue;
        } catch (MQBrokerException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
          log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());
          exception = e;
          if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
            continue;
          } else {
            if (sendResult != null) {
              return sendResult;
            }
            throw e;
          }
        } catch (InterruptedException e) {
          endTimestamp = System.currentTimeMillis();
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
          log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
          log.warn(msg.toString());

          log.warn("sendKernelImpl exception", e);
          log.warn(msg.toString());
          throw e;
        }
      } else {
        break;
      }
    }
    if (sendResult != null) {
      return sendResult;
    }

    String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                                times,
                                System.currentTimeMillis() - beginTimestampFirst,
                                msg.getTopic(),
                                Arrays.toString(brokersSent));

    info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

    MQClientException mqClientException = new MQClientException(info, exception);
    if (callTimeout) {
      throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
    }

    if (exception instanceof MQBrokerException) {
      mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
    } else if (exception instanceof RemotingConnectException) {
      mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
    } else if (exception instanceof RemotingTimeoutException) {
      mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
    } else if (exception instanceof MQClientException) {
      mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
    }

    throw mqClientException;
  }
  // 校验 nameserver 配置是否存在,不存在抛出异常:No name server address, please set it.
  validateNameServerSetting();
  // 否则提示无法找到该 Topic
  throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                              null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

CommunicationMode:SYNC、ASYNC、ONEWAY

  1. 检查生产者服务是否正常启动,其实就是是否调用了 start 方法;校验 Topic 是否符合命名规则、Message 大小是否符合条件

    Topic 命名规则对应的正则:regex: ^[%|a-zA-Z0-9_-]+$

    Message 字节大小超过:1024 * 1024 * 4,也不可满足.

  2. 调用 DefaultMQProducerImpl#tryToFindTopicPublishInfo 方法从 nameserver 中获取到 Topic、MessageQueue 元数据信息,主要目的是为了从其中选择一个 MessageQueue 进行消息的存储,将消息落盘到 Topic 指定的一个 MessageQueue 下

  3. 获取 Topic、MessageQueue 元数据信息成功以后,判别当前属于什么模式

    若当前模式属于同步发送时,会将当前循环遍历的总次数设为 3,当消息投递出现异常时,确保能够通过重试的方式将消息进行可靠存储

    
    
  4. 通过 DefaultMQProducerImpl#selectOneMessageQueue 方法轮询或随机选择 Topic 其中一个 MessageQueue 进行消息投递

  5. 最终通过 DefaultMQProducerImpl#sendKernelImpl 方法去组装请求头以及消息体向 Broker 发起请求,该方法的实现后续展开.

  6. 只有同步模式情况下才会一直等待投递结果的到达,分为以下几种情况对消息是否进行重新投递,如下:

    1. 当 Broker 返回不是 SendStatus.SEND_OK 时,判别生产者是否开启 retryAnotherBrokerWhenNotStoreOK「是否在内部重试失败时重试另外一个」 参数配置,未开启时,当前消息就不会投递成功会直接返回给客户端投递失败的结果,开启时还有另外的限制,如下:

      当开启 retryAnotherBrokerWhenNotStoreOK 参数的目的是为了转换 Broker 进行消息的投递,开启它时必须保证 sendLatencyFaultEnable「故障转移的功能」 参数也是开启的,Producer:retryAnotherBrokerWhenNotStoreOK -> true、Broker:sendLatencyFaultEnable -> true = OK,然后在 DefaultMQProducerImpl#selectOneMessageQueue 方法选择 MessagQueue 会转换目标,选择另外一个可用的 Broker 进行消息投递

    2. 当处理消息持久化出现异常:RemotingException、MQClientException、MQBrokerException 时,会进行重试,默认的重试次数为 3

      当出现 MQBrokerException 异常时,在 Broker 端处理时出现如下错误码说明是要进行重试处理的,其他的异常错误码会直接抛出不作后续的工作

      this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode()
      retryResponseCodes = [
      	ResponseCode.TOPIC_NOT_EXIST,
      	ResponseCode.SERVICE_NOT_AVAILABLE,
      	ResponseCode.SYSTEM_ERROR,
      	ResponseCode.NO_PERMISSION,
      	ResponseCode.NO_BUYER_ID,
      	ResponseCode.NOT_IN_CURRENT_UNIT
      
      

tryToFindTopicPublishInfo

在这里插入图片描述
投递的消息是否会向 broker 发起远程调用请求,先要看 Topic 是否存在以及 Broker 参数的配置,分为以下几种情况:

  1. 当 Topic 不存在时,Broker 中开启了 autoCreateTopicEnable 参数配置时,那么会返回默认 Topic:TBW102 元数据信息返回给生产者,默认的读写队列数为 4,未来当请求到达 broker 端时,它会创建当前真实的 Topic 以及队列数
  2. 当 Topic 不存在时,Broker 中未开启 autoCreateTopicEnable 参数配置,在 sendDefaultImpl 方法时就会直接抛出:No route info of this topic: 异常.
  3. 当 Topic 存在时,直接将该 Topic 信息以及队列数返回即可.

核心的源码如下:

/**
 * 优先从本地缓存中获取 Topic 元数据信息,不存在时在从 nameserver 中进行拉取
 * MQInstance 会调用 updateTopicPublishInfo 方法更新本地的 Topic 对应的元数据信息
 * SendMessageProcessor#preSend 该方法
 *
 * @param topic 主题
 * @return 发布的 Topic 信息:Topic Queue、Topic 元数据
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    // 从 nameserver 获取目标 topic 路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
  }
  // 当 Topic 信息为空时,说明该 Topic 处于未创建的状态,就会将默认 Topic:TBW102 赋值过来
  // 当开启自动创建 Topic 时,等到 Broker 处理消息时再调用 SendMessageProcessor#preSend 方法去进行创建
  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
  } else {
    // 从 nameserver 获取默认 topic 路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
  }
}

RPC Message Send

RPC 方式就是将生产者与消费者之间采用 RPC 方式通信,当生产者生产完消息以后必须等到消费者将消息消费完成,然后通知到你生产者侧才算整个流程完成。

RPC :Message request(final Message msg, final long timeout)

如下是通过 request 方法 RPC 调用的源码:

public Message request(Message msg,
                       long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginTimestamp = System.currentTimeMillis();
  prepareSendRequest(msg, timeout);
  final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

  try {
    final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
    RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

    long cost = System.currentTimeMillis() - beginTimestamp;
    this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {
        requestResponseFuture.setSendRequestOk(true);
      }

      @Override
      public void onException(Throwable e) {
        requestResponseFuture.setSendRequestOk(false);
        requestResponseFuture.putResponseMessage(null);
        requestResponseFuture.setCause(e);
      }
    }, timeout - cost);

    return waitResponse(msg, timeout, requestResponseFuture, cost);
  } finally {
    RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
  }
}

每当通过 request 发出消息以后会实例化一个 RequestResponseFuture 对象,它内部有一个 CountDownLatch 长度为 1 的属性,用于 waitResponse 方法,它是采用异步的方式将消息发送到 Broker 端,有一个回调机制去处理消费者侧对该条消息作出的回复反应.

当消费者端处理完消息的业务逻辑以后通过 MessageUtil#createReplyMessage 方法构建出回复消息以后,Broker 端会通过 ReplyMessageProcessor#processReplyMessageRequest 方法处理这种回复消息,最终 Broker 端会通过 ReplyMessageProcessor#pushReplyMessage 方法向生产者端推送一条消息,生产者客户端通过 ClientRemotingProcessor#processReplyMessage 方法处理回复的消息,最终就是会 RequestResponseFuture 对象里属性信息:requestResponseFuture.putResponseMessage(replyMsg)

Broker 有一个配置项,参数:storeReplyMessageEnable 是否开启存储回复消息

当开启时,每次的回复消息都会被持久化起来

在生产者通过 request 方法投递消息时,会一直阻塞直到消费者将消息处理完再把回复的消息发出来,它在此期间会一直等待,观察 waitResponse 方法的源码一目了然:

private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture,
                             long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
  // requestResponseFuture.waitResponseMessage = countDownLatch.await(timeout, TimeUnit.MILLISECONDS)
  Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
  if (responseMessage == null) {
    if (requestResponseFuture.isSendRequestOk()) {
      throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
                                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
    } else {
      throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
    }
  }
  return responseMessage;
}

MessageQueue Message Send

send 方法带有 MessageQueue 参数时,说明它是要将消息直接存储到某一个指定的 Queue 中,类似我们在投递 Kafka 时只往一个 partition 中塞入消息一样的道理.

/**
 * KERNEL SYNC -------------------------------------------------------
 */
public SendResult send(Message msg, MessageQueue mq)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, MessageQueue mq, long timeout)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  this.makeSureStateOK();
  Validators.checkMessage(msg, this.defaultMQProducer);
  // 投递消息 Topic != 指定 MessageQueue 所在的 Topic
  if (!msg.getTopic().equals(mq.getTopic())) {
    throw new MQClientException("message's topic not equal mq's topic", null);
  }
  long costTime = System.currentTimeMillis() - beginStartTime;
  if (timeout < costTime) {
    throw new RemotingTooMuchRequestException("call timeout");
  }
  return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
}

在此处只是对生产者服务状态以及 Topic 命名规则、消息体大小进行校验,随机就采用同步的方式调用 sendKernelImpl 方法进行后续处理,同时它也支持异步、单向的方式进行投递.

MessageSelectorQueue Message Send

通过自定义 MessageSelectorQueue#select 算法选举 Messag Queue 将消息投递到指定的 Message Queue 中,它的相关源码如下:

/**
 * SELECT SYNC -------------------------------------------------------
 */
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
  throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendSelectImpl(
  Message msg,
  MessageQueueSelector selector,
  Object arg,
  final CommunicationMode communicationMode,
  final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  this.makeSureStateOK();
  Validators.checkMessage(msg, this.defaultMQProducer);
  // 获取 Topic 元数据、MessageQueue 信息
  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  if (topicPublishInfo != null && topicPublishInfo.ok()) {
    MessageQueue mq = null;
    try {
      List<MessageQueue> messageQueueList =
        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
      Message userMessage = MessageAccessor.cloneMessage(msg);
      String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
      userMessage.setTopic(userTopic);
      // 调用自定义 MessageQueueSelector#select 方法选中一个 MessageQueue
      mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
    } catch (Throwable e) {
      throw new MQClientException("select message queue threw exception.", e);
    }

    long costTime = System.currentTimeMillis() - beginStartTime;
    if (timeout < costTime) {
      throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
    }
    if (mq != null) {
      return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
    } else {
      throw new MQClientException("select message queue return null.", null);
    }
  }
  validateNameServerSetting();
  throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

核心方法:sendSelectImpl,它处理的工作分为以下几步走:

  1. 校验生产者服务状态以及 Topic 命名规则、消息体大小是否满足条件
  2. 通过 DefaultMQProducerImpl#tryToFindTopicPublishInfo 方法从 nameserver 中获取到 Topic、Message Queue 元数据信息
  3. 解析获取到的 MessageQueue 集合,再调用自定义的 MessageQueueSelector#select 方法按规则选中一个 MessageQueue
  4. 最终通过 DefaultMQProducerImpl#sendKernelImpl 方法去组装请求头以及消息体向 Broker 发起请求

同时它也支持异步、单向的方式进行投递.

DefaultMQProducerImpl#sendKernelImpl

无论是单向、同步、异步,指定 MessageQueue、MessageQueueSelector,它最终都是指向的 DefaultMQProducerImpl#sendKernelImpl 方法,接下来我们就分析这个方法的源码做了那一些事情.

/**
 * @param msg               消息内容
 * @param mq                存储 MessageQueue
 * @param communicationMode 交流模式:单向、同步、异步
 * @param sendCallback      异步时要处理的回调函数
 * @param topicPublishInfo  Topic 元数据信息
 * @param timeout           可用时间
 * @return 投递消息的结果
 */
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  // 获取 MASTER Broker 节点地址
  String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  if (null == brokerAddr) {
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  }
  SendMessageContext context = null;
  if (brokerAddr != null) {
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    byte[] prevBody = msg.getBody();
    try {
      //for MessageBatch,ID has been set in the generating process
      // 当前类型属于批次消息时,由 MessageClientIDSetter 生成 UNIQ_KEY 属性
      if (!(msg instanceof MessageBatch)) {
        MessageClientIDSetter.setUniqID(msg);
      }
      boolean topicWithNamespace = false;
      if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
        msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
        topicWithNamespace = true;
      }
      int sysFlag = 0;
      boolean msgBodyCompressed = false;
      // 批次消息不支持压缩,单条消息超过 4K 会进行压缩
      if (this.tryToCompressMessage(msg)) {
        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
        msgBodyCompressed = true;
      }
      // 事务消息设置 sysFlag = 4
      final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
      if (Boolean.parseBoolean(tranMsg)) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
      }
      if (hasCheckForbiddenHook()) {
        CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
        checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
        checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
        checkForbiddenContext.setCommunicationMode(communicationMode);
        checkForbiddenContext.setBrokerAddr(brokerAddr);
        checkForbiddenContext.setMessage(msg);
        checkForbiddenContext.setMq(mq);
        checkForbiddenContext.setUnitMode(this.isUnitMode());
        this.executeCheckForbiddenHook(checkForbiddenContext);
      }
      // 当生产者实例化时开启了 enableMsgTrace 消息追踪时会触发钩子之前的方法调用
      if (this.hasSendMessageHook()) {
        context = new SendMessageContext();
        context.setProducer(this);
        context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        context.setCommunicationMode(communicationMode);
        context.setBornHost(this.defaultMQProducer.getClientIP());
        context.setBrokerAddr(brokerAddr);
        context.setMessage(msg);
        context.setMq(mq);
        context.setNamespace(this.defaultMQProducer.getNamespace());
        // 事务半消息
        String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (isTrans != null && isTrans.equals("true")) {
          context.setMsgType(MessageType.Trans_Msg_Half);
        }
        // 延迟消息
        if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
          context.setMsgType(MessageType.Delay_Msg);
        }
        this.executeSendMessageHookBefore(context);
      }
      // 组装发送请求头:Topic、QueueId
      SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
      requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
      requestHeader.setTopic(msg.getTopic());
      requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
      requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
      requestHeader.setQueueId(mq.getQueueId());
      requestHeader.setSysFlag(sysFlag);
      requestHeader.setBornTimestamp(System.currentTimeMillis());
      requestHeader.setFlag(msg.getFlag());
      requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
      requestHeader.setReconsumeTimes(0);
      requestHeader.setUnitMode(this.isUnitMode());
      // 判别是否批次发送的方式
      requestHeader.setBatch(msg instanceof MessageBatch);
      // 重试 Topic:设置重试次数
      if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
        if (reconsumeTimes != null) {
          requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
        }
        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
        if (maxReconsumeTimes != null) {
          requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
        }
      }
      SendResult sendResult = null;
      // 判别发送方式:异步、同步、单向发送,最终都是调用 MQClientAPIImpl#sendMessage 方法
      switch (communicationMode) {
        case ASYNC:
          Message tmpMessage = msg;
          boolean messageCloned = false;
          if (msgBodyCompressed) {
            // If msg body was compressed, msgbody should be reset using prevBody.
            // 使用压缩消息体克隆新消息并恢复原始信息
            // Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
          }
          if (topicWithNamespace) {
            if (!messageCloned) {
              tmpMessage = MessageAccessor.cloneMessage(msg);
              messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
          }
          long costTimeAsync = System.currentTimeMillis() - beginStartTime;
          if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
          }
          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
          break;
        case ONEWAY:
        case SYNC:
          long costTimeSync = System.currentTimeMillis() - beginStartTime;
          if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
          }
          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
          break;
        default:
          assert false;
          break;
      }
      // 当生产者实例化时开启了 enableMsgTrace 消息追踪时会触发钩子之后的方法调用
      if (this.hasSendMessageHook()) {
        context.setSendResult(sendResult);
        this.executeSendMessageHookAfter(context);
      }
      return sendResult;
    } catch (RemotingException | MQBrokerException | InterruptedException e) {
      if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
      }
      throw e;
    } finally {
      msg.setBody(prevBody);
      msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
    }
  }
  throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
  1. 首先获取 Broker-Master 地址,当获取不到时,重新从 nameserver 中进行一次 Topic、MessageQueue 元数据拉取请求,请求返回的这些信息中包含了 Broker 信息,会更新 ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable 集合元素.

  2. 若当前消息类型属于批次消息-MessageBatch 发送时,由 MessageClientIDSetter 生成 UNIQ_KEY 属性

  3. 判断消息大小是否超过 4K,超过了 4K 在此处会对其进行压缩操作,在判断是否属于事务消息类型:TRAN_MSG,是则更新其 sysFlag 标记值

  4. 组装 SendMessageRequestHeader 发送消息体,设置其批次类型、重试次数.

  5. 判别其发送方式:异步、同步、单向发送,在后面最终调用的都是 MQClientAPIImpl#sendMessage 方法

    • 当发送方式为异步时,先判别其发送的消息是否被压缩过,若被压缩过,将其消息 Message 深拷贝为一个 Message 对象,再调用 sendMessage 方法

      在异步方式投递消息时,会出现失败的情况,在对其进行重试时,不是在当前 sendKernelImpl 方法进行重试的,而在当前方法的 finally 块时,会对其进行重新赋值,赋值为未压缩之前的 Message 对象,所以,为了避免在异步发送失败进行重试时所投递的消息不是未被压缩过的,故而言之在这里采用了深拷贝赋值,具体的更多描述

      Fix bug:https://github.com/apache/rocketmq-externals/issues/66

    • 当发送方式为单向、同步时,直接调用 sendMessage 方法

MQClientAPIImpl#sendMessage

MQClientAPIImpl#sendMessage 方法也是所有投递消息的方式都要进行调用的方法,该方法的源码如下:

public SendResult sendMessage(
  final String addr,
  final String brokerName,
  final Message msg,
  final SendMessageRequestHeader requestHeader,
  final long timeoutMillis,
  final CommunicationMode communicationMode,
  final SendCallback sendCallback,
  final TopicPublishInfo topicPublishInfo,
  final MQClientInstance instance,
  final int retryTimesWhenSendFailed,
  final SendMessageContext context,
  final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  RemotingCommand request = null;
  String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
  // RocketMQ 4.6.0 版本中,新增的 Request-Reply 特性,消费者通过 MessageUtil.createReplyMessage 组装 replyMessage 再通过生产者发送.
  // 允许在调用 Producer#request 方法时以同步或异步的方式等待 consumer 消息完成以后返回响应消息,类似 RPC 调用效果
  boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
  // SendMessageRequestHeaderV2 对比普通的 SendMessageRequestHeader 来说,缩短了字段名长度,可以加载序列化、反序列化的速度
  if (isReply) {
    if (sendSmartMsg) {
      SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
      request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
    } else {
      request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
    }
  } else {
    if (sendSmartMsg || msg instanceof MessageBatch) {
      SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
      request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
    } else {
      request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
    }
  }
  request.setBody(msg.getBody());
  switch (communicationMode) {
    case ONEWAY:
      this.remotingClient.invokeOneway(addr, request, timeoutMillis);
      return null;
    case ASYNC:
      final AtomicInteger times = new AtomicInteger();
      long costTimeAsync = System.currentTimeMillis() - beginStartTime;
      if (timeoutMillis < costTimeAsync) {
        throw new RemotingTooMuchRequestException("sendMessage call timeout");
      }
      this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, context, producer);
      return null;
    case SYNC:
      long costTimeSync = System.currentTimeMillis() - beginStartTime;
      if (timeoutMillis < costTimeSync) {
        throw new RemotingTooMuchRequestException("sendMessage call timeout");
      }
      return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
    default:
      assert false;
      break;
  }
  return null;
}

该方法的处理过程大致分为以下几步,如下:

  1. 先获取 Message 的属性:MSG_TYPE,若其属性值为 reply,说明其是采用的 MQ RPC 的方式投递消息的,在生产者侧通过 request 方法调用,在一直阻塞,在消费者接收到该消息时并且处理完以后,再发送一条回复消息给到 Broker,Broker 再将回复消息的内容通知给到生产者端,在不超时的情况下,生产者端正常结束处理.

    属于 RPC 方式时,发出的请求码为 SEND_REPLY_MESSAGE 或 SEND_REPLY_MESSAGE_V2(324、325)

    不采用 RPC 方式时,发出的请求码为 SEND_MESSAGE、SEND_MESSAGE_V2、SEND_BATCH_MESSAGE(10、310、320)

    带 _V2 后缀的与不带的区别只是缩短了字段名长度,可以加快序列化、反序列化的速度,如下代码所示:

    public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final SendMessageRequestHeader v1) {
      SendMessageRequestHeaderV2 v2 = new SendMessageRequestHeaderV2();
      v2.a = v1.getProducerGroup();
      v2.b = v1.getTopic();
      v2.c = v1.getDefaultTopic();
      v2.d = v1.getDefaultTopicQueueNums();
      v2.e = v1.getQueueId();
      v2.f = v1.getSysFlag();
      v2.g = v1.getBornTimestamp();
      v2.h = v1.getFlag();
      v2.i = v1.getProperties();
      v2.j = v1.getReconsumeTimes();
      v2.k = v1.isUnitMode();
      v2.l = v1.getMaxReconsumeTimes();
      v2.m = v1.isBatch();
      return v2;
    }
    
  2. 发送方式不同,在此处调用的方法也各有不同,当属于单向发送时,调用 NettyRemotingClient#invokeOneway 方法;当属于异步发送时,调用 MQClientAPIImpl#sendMessageAsync 以及 NettyRemotingClient#invokeAsync 方法,此处会传入当异步发送失败时,要进行重试的次数:2,它在 sendMessageAsync 方法内进行执行,后续在介绍;当属于同步方式时,调用 NettyRemotingClient#invokeSync 方法

总结

该篇博文主要先简单分析几种不同方式的投递消息方式:SYNC、ASYNC、ONEWAY,以及支持通过指定的 MessageQueue、MessageQueueSelector 方式来对消息进行投递,同步模式投递情况下是通过 DefaultMQProducerImpl#sendDefaultImpl 方法进行重试的,而在异步模式是通过 MQClientAPIImpl#onExceptionImpl 方法重试的,这在后面一篇文章展开介绍,在消息发送之前,会对消息进行压缩,确保客户端与服务端之间交互的数据包大小是最小的,关于 NettyRemotingClient#invokeOneway、MQClientAPIImpl#sendMessageAsync、MQClientAPIImpl#sendMessageSync 方法在下篇文章再具体介绍.

在这里插入图片描述

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注?? + 点赞👍 + 收藏?」就是我创作的最大动力!谢谢大家的支持,我们下文见!

文章来源:https://blog.csdn.net/vnjohn/article/details/135677983
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。