消息生产者 MQ producer 即消息的生产发送方,主要负责将生产方产生的消息投递到 Broker 节点。它主要的源码实现架构如下图,继承了MQAdmin 管理组件接口,内部又依赖了DefaultMQProducerImpl 内部实现类实现所有内部逻辑,DefaultMQProducerImpl 实现了 创建 topic、获取消息队列偏移量、查看消息、发送消息、故障容错、事务消息、顺序消息等能力,最底层通过 NettyRemotingClient netty网络通信客户端组件建立网络连接,发起和接受网络通信请求。
源码版本:4.9.3
源码架构图
// 默认MQ生产者
public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Wrapping internal implementations for virtually all methods presented in this class.
* 这个类是对所有方法的内部实现的包装。
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
}
// 默认MQProducer实现类
public class DefaultMQProducerImpl implements MQProducerInner {
// 默认MQProducer 实例
private final DefaultMQProducer defaultMQProducer;
// 重要 topic 对应的路由数据映射表
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
// 重要 mq客户端实例
private MQClientInstance mQClientFactory;
// 重要 故障切换策略
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
}
// mq客户端实例
public class MQClientInstance {
private final NettyClientConfig nettyClientConfig;
// 重要 mq客户端API
private final MQClientAPIImpl mQClientAPIImpl;
// 重要 mqAdmin客户端
private final MQAdminImpl mQAdminImpl;
// 重要 网络客户端处理器
private final ClientRemotingProcessor clientRemotingProcessor;
// 内部生产者与生产者组映射表
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
// 内部消费者与消费者组映射表
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
// 内部管理器与管理器组映射表
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
// topic路由信息映射表
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
// broker高可用分组映射表
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// broker版本号映射表
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
/*****以下是消费者使用到的数据结构,可以暂不关心*******/
// 拉取消息服务
private final PullMessageService pullMessageService;
// 重平衡服务
private final RebalanceService rebalanceService;
// 生产者实例
private final DefaultMQProducer defaultMQProducer;
// 消费者统计管理器
private final ConsumerStatsManager consumerStatsManager;
// 发送心跳次数统计
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
// 服务状态
private ServiceState serviceState = ServiceState.CREATE_JUST;
private Random random = new Random();
}
MQClientAPIImpl 数据结构
到这一步,就涉及RocketMQ 内部包装的 Netty网络通信实现了,主要就是进行 一些同步、异步、OneWay请求的发送和接受。此处不深入展开。
// mq网络通信客户端API
public class MQClientAPIImpl {
// netty网络通信客户端
private final RemotingClient remotingClient;
// 重要 客户端网络通信处理器
private final ClientRemotingProcessor clientRemotingProcessor;
}