RocketMQ源码 Producer生产者源码架构分析

发布时间:2024年01月05日

前言

消息生产者 MQ producer 即消息的生产发送方,主要负责将生产方产生的消息投递到 Broker 节点。它主要的源码实现架构如下图,继承了MQAdmin 管理组件接口,内部又依赖了DefaultMQProducerImpl 内部实现类实现所有内部逻辑,DefaultMQProducerImpl 实现了 创建 topic、获取消息队列偏移量、查看消息、发送消息、故障容错、事务消息、顺序消息等能力,最底层通过 NettyRemotingClient netty网络通信客户端组件建立网络连接,发起和接受网络通信请求。

源码版本:4.9.3

源码架构图

源码分析

DefaultMQProducer 数据结构

// 默认MQ生产者
public class DefaultMQProducer extends ClientConfig implements MQProducer {

    /**
     * Wrapping internal implementations for virtually all methods presented in this class.
     * 这个类是对所有方法的内部实现的包装。
     */
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
}

DefaultMQProducerImpl 数据结构

// 默认MQProducer实现类
public class DefaultMQProducerImpl implements MQProducerInner {
    // 默认MQProducer 实例
    private final DefaultMQProducer defaultMQProducer;

    // 重要 topic 对应的路由数据映射表
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();

    // 重要 mq客户端实例
    private MQClientInstance mQClientFactory;
    // 重要 故障切换策略
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
}

MQClientInstance 数据结构

// mq客户端实例
public class MQClientInstance {

    private final NettyClientConfig nettyClientConfig;
    // 重要 mq客户端API
    private final MQClientAPIImpl mQClientAPIImpl;
    // 重要 mqAdmin客户端
    private final MQAdminImpl mQAdminImpl;
    // 重要 网络客户端处理器
    private final ClientRemotingProcessor clientRemotingProcessor;


    // 内部生产者与生产者组映射表
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    // 内部消费者与消费者组映射表
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    // 内部管理器与管理器组映射表
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    // topic路由信息映射表
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    // broker高可用分组映射表
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    // broker版本号映射表
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();


    /*****以下是消费者使用到的数据结构,可以暂不关心*******/

    // 拉取消息服务
    private final PullMessageService pullMessageService;
    // 重平衡服务
    private final RebalanceService rebalanceService;
    // 生产者实例
    private final DefaultMQProducer defaultMQProducer;
    // 消费者统计管理器
    private final ConsumerStatsManager consumerStatsManager;
    // 发送心跳次数统计
    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
    // 服务状态
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private Random random = new Random();
}

MQClientAPIImpl 数据结构

到这一步,就涉及RocketMQ 内部包装的 Netty网络通信实现了,主要就是进行 一些同步、异步、OneWay请求的发送和接受。此处不深入展开。

// mq网络通信客户端API
public class MQClientAPIImpl {

    // netty网络通信客户端
    private final RemotingClient remotingClient;
    
    // 重要 客户端网络通信处理器
    private final ClientRemotingProcessor clientRemotingProcessor;
}

文章来源:https://blog.csdn.net/hzwangmr/article/details/135403124
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。