RocketMQ MQClientInstance、生产者实例启动源码分析

发布时间:2024年01月06日

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
? 有趣的事实:音乐、跑步、电影、游戏

目录

前言

从 RocketMQ 源码来看,在生产者、消费者上都是使用的一个相同的客户端实例类:MQClientInstance,在该篇博文会分析该类里面核心的结构信息以及介绍生产者它的一个启动过程.

DefaultProducetImpl、DefaultLitePullConsumerImpl、DefaultMQPushConsumerImpl

RocketMQ 专栏篇:

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

保护数据完整性:探索 RocketMQ 分布式事务消息的力量

RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

RocketMQ 生产者源码分析:DefaultMQProducer、DefaultMQProducerImpl

类结构

MQClientInstance

MQClientInstance 作为生产者、消费者共同使用的 MQ 客户端实例类,先来分析一下它内部重要的属性信息:

// 客户端id > IP+实例名称
private final String clientId;

// 生产者组 -> 生产者实例
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
// 消费者组 -> 消费者实例
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
// Admin 组 -> MQ 管理实现类,比如:RocketMQ Dashboard,通过它来获取 MQ 集群、MQ Broker、MQ Topic 等元数据信息
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
// Topic -> Topic Queue、Broker 信息
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();

// NettyRemoteClient 使用的配置
// MQClientAPIImpl#start 启动就是启动 NettyRemoteClient 与 NettyServer 建立 TCP 连接,进行 Epoll 网络传输
// ClientRemotingProcessor 处理发起请求以及响应服务端请求的处理类,它是 MQClientAPIImpl 内的属性,由它负责发起请求
private final NettyClientConfig nettyClientConfig;
private final ClientRemotingProcessor clientRemotingProcessor;
private final MQClientAPIImpl mQClientAPIImpl;

// 从 nameserver 获取 Topic 元数据的同步锁|定时获取 broker 心跳,确保在同一个应用程序内多个生产者、多个消费者同时触发调用
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();

// 单线程定时任务调度
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
  @Override
  public Thread newThread(Runnable r) {
    return new Thread(r, "MQClientFactoryScheduledThread");
  }
});

// 消费推模式所使用的核心类、消费者数量发生重平衡问题如何再均衡分派的核心类
private final PullMessageService pullMessageService;
private final RebalanceService rebalanceService;

以下是相关的类结构图

在这里插入图片描述

属性:ConcurrentMap<String /* Group */, MQProducerInner> producerTable 作用于生产者

属性:ConcurrentMap<String /* Group */, MQConsumerInner> consumerTable、PullMessageService、RebalanceService 作用于消费者

RebalanceService 具体的实现类会在实例化消费者时指定好.

生产者、消费者都是承当 NettyRemoteClient 角色,它里面的处理类型包含了生产者、消费者的请求,相关的请求编码可查看:ClientRemoteProcessor

TopicRouteData

生产者、消费者都会使用到 Topic 路由信息,同时都会定时从 nameserver 拉取更新这些元数据信息,以下是其内部属性结构:

// 多个 broker ; 分割
private String orderTopicConf;
// BrokerName、读队列数、写队列数、队列权限级别(读、写)
private List<QueueData> queueDatas;
// BrokerName、Broker-Id、Broker-Addr
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

这些属性都会提供对应的 getter/setter 方法

QueueData

它属于主题元数据的子类,队列数据:Broker-Name、读取队列、写队列、权限级别、主题标签

private String brokerName;
private int readQueueNums;
private int writeQueueNums;
// 代表权限级别:读-4、写-2
private int perm;
// 主题标签:默认为 0,在创建主题时重试 RETRY 为 2、其余主题为 1
private int topicSysFlag;

在创建主题时会给主题打上对应的标签

int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
  // RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"
  if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
  } else {
    topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
  }
}

读取、写入队列数量不指定时默认都是 4:DefaultMQProducer#defaultTopicQueueNums

BrokerData

在这里会存储好 Broker-Name、Broker-Id、Broker-Address

private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

总的可以通过 RocketMQ Dashboard 控制台打开查看 Topic Router 信息

在这里插入图片描述

在 RocketMQ 第一篇文章搭建 RocketMQ 集群时,采用的是「多 Master 多 Slave 模式—异步复制」集群模式,所以在上面能够看到两个 BrokerId -> Broker Address

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

DefaultMQProducer#start

一开始的入口就是要调用该方法启动生产者实例才能运用生产者投递消息

在这里插入图片描述

public void start() throws MQClientException {
  // 组装生产者组
  this.setProducerGroup(withNamespace(this.producerGroup));
  this.defaultMQProducerImpl.start();
  // 追踪消息日志的一个异步守护线程,通过 enableMsgTrace 参数开启
  if (null != traceDispatcher) {
    try {
      traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
    } catch (MQClientException e) {
      log.warn("trace dispatcher start failed ", e);
    }
  }
}

主要关注的是在 DefaultProducerImpl#start 做的事情

DefaultProducerImpl#start

在生产者实例的实现类来看,它主要是将客户端实例先进行实例化、初始化,然后再将其启动起来.

public void start(final boolean startFactory) throws MQClientException {
  switch (this.serviceState) {
      // 默认状态
    case CREATE_JUST:
      this.serviceState = ServiceState.START_FAILED;
      // 检查生产者组名是否满足要求
      this.checkConfig();

      if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
        this.defaultMQProducer.changeInstanceNameToPID();
      }
      // 创建一个客户端实例
      this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
      // 在一个应用程序内通过不同的生产者组名来区分不同的 MQ 生产者,若出现一样的情况下会出现以下异常
      boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
      if (!registerOK) {
        this.serviceState = ServiceState.CREATE_JUST;
        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                    null);
      }
      // 自动创建 Topic 开启时指定的默认 Key,将其存在到 Topic 信息表中
      this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
      // 外部使用的客户端实例需要将其启动使用
      if (startFactory) {
        mQClientFactory.start();
      }
      log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
               this.defaultMQProducer.isSendMessageWithVIPChannel());
      // 将服务状态调整为运行态
      this.serviceState = ServiceState.RUNNING;
      break;
    case RUNNING:
    case START_FAILED:
    case SHUTDOWN_ALREADY:
      throw new MQClientException("The producer service state not OK, maybe started once, "
                                  + this.serviceState
                                  + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                  null);
    default:
      break;
  }

  this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

  RequestFutureHolder.getInstance().startScheduledTask(this);

}

一开始 ServiceState 默认就是 CREATE_JUST,如果是多次调用 start 方法就会出现: MQClientException("The producer service state not OK, maybe started once

它的主要作用是为了将客户端实例 MQClientInstance 给启动起来,在它里面承载了向 Broker 发起请求以及元数据同步的事情,启动完成以后,会向所有的 Broker 发起请求建立 TCP 连接维持健康心跳.

MQClientInstance#start

public void start() throws MQClientException {
  synchronized (this) {
    switch (this.serviceState) {
      case CREATE_JUST:
        this.serviceState = ServiceState.START_FAILED;
        // If not specified,looking address from name server
        if (null == this.clientConfig.getNamesrvAddr()) {
          this.mQClientAPIImpl.fetchNameServerAddr();
        }
        // 开启客户端请求-响应的通道
        this.mQClientAPIImpl.start();
        // 定时任务调度更新信息
        this.startScheduledTask();
        // 启动推送消息的服务.
        this.pullMessageService.start();
        // 启动重平衡服务
        this.rebalanceService.start();
        // 不作为,在高版本中被删减,参数为 FALSE
        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
        log.info("the client factory [{}] start OK", this.clientId);
        this.serviceState = ServiceState.RUNNING;
        break;
      case START_FAILED:
        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
      default:
        break;
    }
  }
}

维持好一个既承当生产者、消费者的客户端实例,它里面包含了针对生产者的工作也包含了对消费者的工作处理

1、维护好客户端 API 调用 Broker 服务端的链 > MQClientAPIImpl

2、通过定时任务从 nameserver 获取 Topic 元数据信息、定时维持好与 Broker 之间的心跳 > startScheduledTask

3、生产者、消费者都是 Netty 客户端角色,无阻塞方式建立好 TCP 连接,接受来自 Broker 读、写以及向 Broker 发起读、写事件 > MQClientAPIImpl.NettyRemotingClient

MQClientAPIImpl 启动的工作就是将在实例化时构建好的 NettyRemotingClient 实例启动,以下是 NettyRemotingClient#start 启动时的源码:

public void start() {
  this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
    // 默认线程数为 4
    nettyClientConfig.getClientWorkerThreads(),
    new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);
      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
      }
    });
  // 后续会发起请求时会通过 eventLoopGroupWorker 去建立 Socket 连接与服务端之间进行读、写交互
  // NioSocketChannel 代表的就是非阻塞的 SocketChannel
  Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
    // 数据包组装为更大的帧然后进行发送
    .option(ChannelOption.TCP_NODELAY, true)
    // 定时发送探测包来探测连接的对端是否存活
    .option(ChannelOption.SO_KEEPALIVE, false)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    .handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (nettyClientConfig.isUseTLS()) {
          if (null != sslContext) {
            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
            log.info("Prepend SSL handler");
          } else {
            log.warn("Connections are insecure as SSLContext is null!");
          }
        }
        // DefaultEventExecutorGroup 用来执行以下五个 ChannelHandler
        pipeline.addLast(
          defaultEventExecutorGroup,
          // 编码 -> 处理请求
          new NettyEncoder(),
          // 解码 -> 处理响应
          new NettyDecoder(),
          new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
          new NettyConnectManageHandler(),
          // 远程调用->请求、响应处理器
          new NettyClientHandler());
      }
    });
  // 操作系统客户端发送缓冲区的大小
  if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
    log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
    handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
  }
  // 操作系统客户端接收缓冲区的大小
  if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
    log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
    handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
  }
  if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
    log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
             nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
    handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
      nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
  }
  // Timer 定时执行哪些请求过期的事件,每隔 3 秒
  this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
      try {
        NettyRemotingClient.this.scanResponseTable();
      } catch (Throwable e) {
        log.error("scanResponseTable exception", e);
      }
    }
  }, 1000 * 3, 1000);
  // 客户端一般为空,在 nameserver 与 Broker 交互时会使用到,做一些连接、关闭、异常、死亡状态的回调处理
  if (this.channelEventListener != null) {
    this.nettyEventExecutor.start();
  }
}

在这里会将 Netty Worker Group Selector 创建好,等到客户端实例发起请求时再去建立 TCP 连接,事件到来有五个需要执行的类,区分为请求和响应

NettyEncoder:编码器-处理请求的事件

NettyDecoder:解码器-处理响应的事件

IdleStateHandler:当客户端空闲时,会处理的事件

NettyConnectManageHandler:处理连接的事件,当事件监听器不为空时同时会给监听器也派发连接的事件

NettyClientHandler:客户端的读写处理逻辑,非常重要!!

客户端都是以非阻塞的方式与服务端建立 TCP 连接的,如下图:

在这里插入图片描述

关于更多 RocketMQ 网络通信模型的核心类属性结构以及方法、请求-响应之间如何协调在后续文章再具体分析.

在 MQClientInstance#start 方法还会启动五个定时任务调度 MQClientInstance#startScheduledTask,用于更新元数据、维持健康心跳等

private void startScheduledTask() {
  if (null == this.clientConfig.getNamesrvAddr()) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
        } catch (Exception e) {
          log.error("ScheduledTask fetchNameServerAddr exception", e);
        }
      }
    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
  }
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
      } catch (Exception e) {
        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
      }
    }
  }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.cleanOfflineBroker();
        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
      } catch (Exception e) {
        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
      }
    }
  }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.persistAllConsumerOffset();
      } catch (Exception e) {
        log.error("ScheduledTask persistAllConsumerOffset exception", e);
      }
    }
  }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        MQClientInstance.this.adjustThreadPool();
      } catch (Exception e) {
        log.error("ScheduledTask adjustThreadPool exception", e);
      }
    }
  }, 1, 1, TimeUnit.MINUTES);
}
  1. MQClientAPIImpl#fetchNameServerAddr:若 nameserver 信息为空时,则 2 分钟更新一次 nameserver 信息

  2. MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long)默认每隔 30 秒检查|更新一次 Topic 元数据信息 > TopicRouteData

    生产者:producerTable
    消费者:consumerTable
    判别是否需要更新「生产者-发布信息、消费者-订阅信息」取决于元数据是否变更

  3. MQClientAPIImpl#sendHearbeat默认每隔 30 秒检查|更新 broker 信息,对比是否 Topic 元数据中的 broker 匹配

  4. MQConsumerInner#persistConsumerOffset:取决于消费者的不同实现,默认每隔 5 秒持久化消费者的偏移量信息,向本地文件或其他 broker 发出请求同步持久

  5. DefaultMQPushConsumerImpl#adjustThreadPool:默认为 1 分钟执行一次动态调整「推」消费模式的核心线程数.

PullMessageService:一个单独的线程,当它启动以后,会从一个阻塞队列中获取元素,当元素不为空时就向 broker 发起拉取消息的请求

它有两个核心的方法:PullMessageService#executePullRequestLater、PullMessageService#executePullRequestImmediately

两个方法都是由推送模式的消费者实例调起:DefaultMQPushConsumerImpl#executePullRequestImmediately、DefaultMQPushConsumerImpl#executePullRequestLater

executePullRequestImmediately:直接往阻塞队列中塞入一个元素,元素就是拉取的请求信息 > PullRequest

executePullRequestLater:当消息在客户端未处理完或发生了异常,就延迟一会时间再进行拉取,这一块具体在讲解消费者源码时再详细说明,在 PullMessageService 就是通过定时任务延迟再调用 executePullRequestImmediately 方法

RebalanceService:一个单独的线程,当消费者数量变更以后,由 Broker 向消费者发起请求,消费者实例收到响应后再延迟一段时间触发重平衡的工作,在重平衡时处理时会区分消费模式属于集群模式还是广播模式,集群模式会选择对应的分配策略:AllocateMessageQueueStrategy 重新为 Topic 下 MessageQueue 重新均衡分配消息.

重平衡是一个很漫长的过程,会采用 ProcessQueue 加锁的方式为 Topic 下所有 Queue 重新分配以及维护好 Offset 工作.

总结

该篇博文主要介绍 MQClientInstance 客户端实例以及 TopicRouteData 主题元数据信息,结合 Dashboard 看会更加的清晰,后续介绍了生产者实例启动的一个过程,其中介绍了几个核心的地方:NettyRemotingClient Netty 客户端、MQClientInstance#startScheduledTask 涉及到的定时调度任务、PullMessageService 推送模式作用的类、RebalanceService 重平衡服务,希望该篇博文你能够喜欢,感谢三连支持??

后续文章会分享生产者如何发送消息至 Broker 以及它如何处理返回的.

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 RocketMQ 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注?? + 点赞👍 + 收藏?」就是我创作的最大动力!谢谢大家的支持,我们下文见!

文章来源:https://blog.csdn.net/vnjohn/article/details/135376242
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。