RocketMQ 5.0 引入了新的 Proxy 组件,为了便于多语言客户端 SDK 的开发维护,客户端的很多功能也都下沉到了 Proxy,客户端因此变得更加轻量化了,新版客户端源码简洁易懂。
源码相较于 4.x 版本变化很大,本文分析 RocketMQ 5.0 Producer 客户端和 Proxy 的消息发送整体流程。
服务端源码版本基于 5.0.0、客户端源码版本基于 java-5.0.4。
服务端:https://github.com/apache/rocketmq
客户端:https://github.com/apache/rocketmq-clients
新版客户端依赖ClientServiceProvider
接口,它为客户端服务提供实现,RocketMQ 通过 Java SPI 来加载实现类ClientServiceProviderImpl
。
public interface ClientServiceProvider {
// SPI 加载实现类
static ClientServiceProvider loadService() {
final ServiceLoader<ClientServiceProvider> loaders = ServiceLoader.load(ClientServiceProvider.class);
final Iterator<ClientServiceProvider> iterators = loaders.iterator();
if (iterators.hasNext()) {
return iterators.next();
}
throw new UnsupportedOperationException("Client service provider not found");
}
}
新版 SDK 大量使用建造者模式,例如通过ProducerBuilder
来构建生产者。新版 SDK 全面使用 gRPC 协议且只客户端和 Proxy 通信,如果你配置的是 Broker 的地址协议会不通,构建会报错。配置好所需的参数后,就可以调用build()
开始构建了:
public Producer build() {
checkNotNull(clientConfiguration, "clientConfiguration has not been set yet");
final ProducerImpl producer = new ProducerImpl(clientConfiguration, topics, maxAttempts, checker);
producer.startAsync().awaitRunning();
return producer;
}
ProducerImpl 是生产者的默认实现,实例化的时候主要是配置了一些线程池,随后马上会调用startAsync()
启动生产者,主要是触发一些事件,核心是父类ClientImpl#startUp()
。
生产者的职责就是发消息,所以它关心的是:该把消息发给哪个 Broker?所以就需要一份 Topic 的路由信息。路由信息可以在启动时就提前拉取以减小发消息的延迟,也会开启定时任务每 30 秒更新一次。
protected void startUp() throws Exception {
this.clientManager.startAsync().awaitRunning();
// 启动时缓存Topic路由
for (String topic : topics) {
final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
future.get();
}
// 30s更新一次路由信息
final ScheduledExecutorService scheduler = clientManager.getScheduler();
this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
try {
updateRouteCache();
} catch (Throwable t) {
log.error("Exception raised while updating topic route cache, clientId={}", clientId, t);
}
}, 10, 30, TimeUnit.SECONDS);
}
获取路由信息是要请求 Proxy 的,所以此时会和 Proxy 建立网络连接,连接会被封装为RpcClient
客户端对象。Proxy 往往是集群化部署的,所以会对应多个 Address,这一批 Address 会被封装为Endpoints
对象,代表一批 Proxy 端点,再构建RpcClient
对象,gRPC 底层会对这一批 Proxy 做负载均衡调用。
private RpcClient getRpcClient(Endpoints endpoints) throws ClientException {
RpcClient rpcClient;
// 读锁
rpcClientTableLock.readLock().lock();
try {
// 缓存有直接返回
rpcClient = rpcClientTable.get(endpoints);
if (null != rpcClient) {
return rpcClient;
}
} finally {
rpcClientTableLock.readLock().unlock();
}
// 还没创建客户端,拿写锁
rpcClientTableLock.writeLock().lock();
try {
rpcClient = rpcClientTable.get(endpoints);
if (null != rpcClient) {
return rpcClient;
}
try {
// 创建客户端
rpcClient = new RpcClientImpl(endpoints);
} catch (SSLException e) {
log.error("Failed to get RPC client, endpoints={}, clientId={}", endpoints, client.getClientId(), e);
throw new ClientException("Failed to generate RPC client", e);
}
// 缓存起来
rpcClientTable.put(endpoints, rpcClient);
return rpcClient;
} finally {
rpcClientTableLock.writeLock().unlock();
}
}
建立连接后,就可以发起 gRPC 调用查询 Topic 路由信息了,Proxy 返回的是QueryRouteResponse
,包含 Topic 下所有的队列信息,以及队列所属的 Broker 信息。
@Override
public ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata,
QueryRouteRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
// gRPC调用 查询路由信息
return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
.withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryRoute(request);
}
有了路由信息,生产者就可以开始发消息了,RocketMQ 支持批量发消息。首先把 Message 转换成 PublishingMessageImpl,此时会对消息体做检验,不能超过 4MB,同时生成 MessageID 等。
List<PublishingMessageImpl> pubMessages = new ArrayList<>();
for (Message message : messages) {
try {
final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, publishingSettings,
txEnabled);
pubMessages.add(pubMessage);
} catch (Throwable t) {
future.setException(t);
return future;
}
}
之后校验批量发送的消息必须属于同一 Topic、且消息类型是一致的,不能有的是普通消息,有的是顺序消息等。
校验通过后,RocketMQ 会基于路由信息构建一个消息发布的负载均衡器PublishingLoadBalancer
,它的职责是从一批队列里选择一个队列发送。
private PublishingLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) {
this.index = index;
final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
.filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
.collect(Collectors.toList());
if (mqs.isEmpty()) {
throw new IllegalArgumentException("No writable message queue found, topiRouteData=" + topicRouteData);
}
this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
}
负载均衡的策略很简单,就是轮询。因为消息发送失败后需要重试,重试的时候需要规避掉失败的 Broker,所以这里做了一些处理,就不展开了。
注意:消息实际写入哪个队列,不是生产者决定的,客户端理论上只是选择一个 Proxy 发消息。如果 Proxy 是 Local 部署消息会写入本机队列,如果是 Cluster 部署,那么客户端把消息发给哪个 Proxy 都无所谓。
因为客户端是用 gRPC 协议通信的,所以请求要先转换成 Protobuf 消息,发消息请求对应的类是SendMessageRequest
,里面包含一批要发送的消息,消息也会被转换成支持 Protobuf 序列化的对象apache.rocketmq.v2.Message
。
private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
.map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
return SendMessageRequest.newBuilder().addAllMessages(messages).build();
}
请求构建好以后,就可以通过RpcClient
把请求发出去了
public ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata,
SendMessageRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
.withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).sendMessage(request);
}
客户端发消息的流程到此就结束了。
Proxy 启动时会开启一个 gRPC 服务GrpcServer
,默认监听 8081 端口,核心是请求处理器GrpcMessagingApplication
。
我们先看 Proxy 是怎么处理客户端查询路由信息请求的:
入口方法是GrpcMessagingApplication#queryRoute
@Override
public void queryRoute(QueryRouteRequest request, StreamObserver<QueryRouteResponse> responseObserver) {
Function<Status, QueryRouteResponse> statusResponseCreator = status -> QueryRouteResponse.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
try {
validateContext(context);
this.addExecutor(this.routeThreadPoolExecutor,
context,
request,
// 查询Topic路由信息
() -> grpcMessingActivity.queryRoute(context, request)
// 完成时发送响应结果给客户端
.whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
// 返回异常信息
writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
}
}
Proxy 是无状态的,它并不保存 Topic 路由信息,它会在启动时和定时任务里去请求 Namesrv 拉取然后缓存起来,代码在TopicRouteService
的构造函数里:
this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), TimeUnit.SECONDS).
executor(cacheRefreshExecutor).build(new CacheLoader<String, MessageQueueView>() {
@Override public @Nullable MessageQueueView load(String topic) throws Exception {
try {
TopicRouteData topicRouteData = topicRouteCacheLoader.loadTopicRouteData(topic);
if (isTopicRouteValid(topicRouteData)) {
MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
return tmp;
}
return MessageQueueView.WRAPPED_EMPTY_QUEUE;
} catch (Exception e) {
if (TopicRouteHelper.isTopicNotExistError(e)) {
return MessageQueueView.WRAPPED_EMPTY_QUEUE;
}
throw e;
}
}
});
请求最终会委托route.RouteActivity#queryRoute
处理,主要步骤:
public CompletableFuture<QueryRouteResponse> queryRoute(ProxyContext ctx, QueryRouteRequest request) {
CompletableFuture<QueryRouteResponse> future = new CompletableFuture<>();
try {
validateTopic(request.getTopic());
// 客户端请求Proxy的IP地址
List<org.apache.rocketmq.proxy.common.Address> addressList = this.convertToAddressList(request.getEndpoints());
String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic());
/**
* 本地缓存拿Topic路由 Proxy会定时从Namesrv拉取
* Proxy把Broker的IP端口替换成Proxy自己的再返回,让客户端只和Proxy通信
*/
ProxyTopicRouteData proxyTopicRouteData = this.messagingProcessor.getTopicRouteDataForProxy(
ctx, addressList, topicName);
List<MessageQueue> messageQueueList = new ArrayList<>();
Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
// Topic消息类型 Local模式本地直接取 Cluster模式定时任务拉取
TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topicName);
for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
String brokerName = queueData.getBrokerName();
Map<Long, Broker> brokerIdMap = brokerMap.get(brokerName);
if (brokerIdMap == null) {
break;
}
for (Broker broker : brokerIdMap.values()) {
messageQueueList.addAll(this.genMessageQueueFromQueueData(queueData, request.getTopic(), topicMessageType, broker));
}
}
// 构建响应结果
QueryRouteResponse response = QueryRouteResponse.newBuilder()
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()))
.addAllMessageQueues(messageQueueList)
.build();
future.complete(response);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}
生产者拿到路由信息后,就会轮询出一个队列,然后给队列所属的 Broker 发请求,请求实际还是会发给 Proxy。
Proxy 处理发消息的流程如下:
请求最终会委托给ProducerProcessor#sendMessage
处理,主要步骤:
public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSelector queueSelector,
String producerGroup, int sysFlag, List<Message> messageList, long timeoutMillis) {
CompletableFuture<List<SendResult>> future = new CompletableFuture<>();
try {
Message message = messageList.get(0);
String topic = message.getTopic();
if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(topic);
TopicMessageType messageType = parseFromMessageExt(message);
topicMessageTypeValidator.validate(topicMessageType, messageType);
}
}
}
/**
* 选择一个Queue
* - Local:返回本机Queue视图,轮询一个(避免多一次网络调用)
* - Cluster:返回全局Queue视图,轮询一个
*/
AddressableMessageQueue messageQueue = queueSelector.select(ctx,
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
if (messageQueue == null) {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
}
// 构建Remoting协议的消息发送请求头
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
/**
* 发送消息
* - Local:直接调本地SendMessageProcessor#processRequest
* - Cluster:Remoting协议发送请求到Broker
*/
future = this.serviceManager.getMessageService().sendMessage(
ctx,
messageQueue,
messageList,
requestHeader,
timeoutMillis)
.thenApplyAsync(sendResultList -> {
for (SendResult sendResult : sendResultList) {
int tranType = MessageSysFlag.getTransactionValue(requestHeader.getSysFlag());
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
StringUtils.isNotBlank(sendResult.getTransactionId())) {
fillTransactionData(producerGroup, messageQueue, sendResult, messageList);
}
}
return sendResultList;
}, this.executor);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return FutureUtils.addExecutor(future, this.executor);
}
消息发送最终会委托给消息服务 MessageService,由于 Proxy 有 Local 和 Cluster 两种部署模式,所以也对应了两种实现方案。
Local 模式下,因为目标队列是在本机里面选的,所以直接调用本地方法存储消息即可,少了一次网络调用,延迟会更低;Cluster 模式下,目标队列绝对不在本机,所以索性就从全局队列里面选,Proxy 再根据 Remoting 协议去调用 Broker 存储消息。
至此,Proxy 发消息的流程也结束了,后续流程就是 Broker 如何存储消息了。
引入 Proxy 组件后,客户端 SDK 代码更简洁易懂了。生产者先是向 Proxy 查询 Topic 路由信息,然后轮询一个队列,再把消息发给对应的 Proxy,Proxy 再根据部署模式从本机/全局队列里面轮询出一个目标队列,最后把 gRPC 适配成 Remoting 协议发给 Broker 存储消息。
为什么要轮询两次呢?实际上,消息最终写入哪个队列并不是生产者和 Proxy 决定的。我的理解,如果 Proxy 是 Cluster 模式部署的话,生产者的轮询是没有意义的,直接发给 Proxy 就好了。Proxy 虽然也轮询了一次,但它的目的是选出目标 Broker,实际上它发给 Broker 的 queueId 是固定的值 -1,消息最终写入哪个队列,是由 Broker 决定的。