RocketMQ5.0消息发送流程

发布时间:2024年01月03日

前言

RocketMQ 5.0 引入了新的 Proxy 组件,为了便于多语言客户端 SDK 的开发维护,客户端的很多功能也都下沉到了 Proxy,客户端因此变得更加轻量化了,新版客户端源码简洁易懂。
源码相较于 4.x 版本变化很大,本文分析 RocketMQ 5.0 Producer 客户端和 Proxy 的消息发送整体流程。
服务端源码版本基于 5.0.0、客户端源码版本基于 java-5.0.4。
服务端:https://github.com/apache/rocketmq
客户端:https://github.com/apache/rocketmq-clients

客户端发消息

image.png
新版客户端依赖ClientServiceProvider接口,它为客户端服务提供实现,RocketMQ 通过 Java SPI 来加载实现类ClientServiceProviderImpl

public interface ClientServiceProvider {
    // SPI 加载实现类
    static ClientServiceProvider loadService() {
        final ServiceLoader<ClientServiceProvider> loaders = ServiceLoader.load(ClientServiceProvider.class);
        final Iterator<ClientServiceProvider> iterators = loaders.iterator();
        if (iterators.hasNext()) {
            return iterators.next();
        }
        throw new UnsupportedOperationException("Client service provider not found");
    }
}

新版 SDK 大量使用建造者模式,例如通过ProducerBuilder来构建生产者。新版 SDK 全面使用 gRPC 协议且只客户端和 Proxy 通信,如果你配置的是 Broker 的地址协议会不通,构建会报错。配置好所需的参数后,就可以调用build()开始构建了:

public Producer build() {
    checkNotNull(clientConfiguration, "clientConfiguration has not been set yet");
    final ProducerImpl producer = new ProducerImpl(clientConfiguration, topics, maxAttempts, checker);
    producer.startAsync().awaitRunning();
    return producer;
}

ProducerImpl 是生产者的默认实现,实例化的时候主要是配置了一些线程池,随后马上会调用startAsync()启动生产者,主要是触发一些事件,核心是父类ClientImpl#startUp()
生产者的职责就是发消息,所以它关心的是:该把消息发给哪个 Broker?所以就需要一份 Topic 的路由信息。路由信息可以在启动时就提前拉取以减小发消息的延迟,也会开启定时任务每 30 秒更新一次。

protected void startUp() throws Exception {
    this.clientManager.startAsync().awaitRunning();
    // 启动时缓存Topic路由
    for (String topic : topics) {
        final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
        future.get();
    }
	// 30s更新一次路由信息
    final ScheduledExecutorService scheduler = clientManager.getScheduler();
    this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
        try {
            updateRouteCache();
        } catch (Throwable t) {
            log.error("Exception raised while updating topic route cache, clientId={}", clientId, t);
        }
    }, 10, 30, TimeUnit.SECONDS);
}

获取路由信息是要请求 Proxy 的,所以此时会和 Proxy 建立网络连接,连接会被封装为RpcClient客户端对象。Proxy 往往是集群化部署的,所以会对应多个 Address,这一批 Address 会被封装为Endpoints对象,代表一批 Proxy 端点,再构建RpcClient对象,gRPC 底层会对这一批 Proxy 做负载均衡调用。

private RpcClient getRpcClient(Endpoints endpoints) throws ClientException {
    RpcClient rpcClient;
    // 读锁
    rpcClientTableLock.readLock().lock();
    try {
        // 缓存有直接返回
        rpcClient = rpcClientTable.get(endpoints);
        if (null != rpcClient) {
            return rpcClient;
        }
    } finally {
        rpcClientTableLock.readLock().unlock();
    }
    // 还没创建客户端,拿写锁
    rpcClientTableLock.writeLock().lock();
    try {
        rpcClient = rpcClientTable.get(endpoints);
        if (null != rpcClient) {
            return rpcClient;
        }
        try {
            // 创建客户端
            rpcClient = new RpcClientImpl(endpoints);
        } catch (SSLException e) {
            log.error("Failed to get RPC client, endpoints={}, clientId={}", endpoints, client.getClientId(), e);
            throw new ClientException("Failed to generate RPC client", e);
        }
        // 缓存起来
        rpcClientTable.put(endpoints, rpcClient);
        return rpcClient;
    } finally {
        rpcClientTableLock.writeLock().unlock();
    }
}

建立连接后,就可以发起 gRPC 调用查询 Topic 路由信息了,Proxy 返回的是QueryRouteResponse,包含 Topic 下所有的队列信息,以及队列所属的 Broker 信息。

@Override
public ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata,
    QueryRouteRequest request, Executor executor, Duration duration) {
    this.activityNanoTime = System.nanoTime();
    // gRPC调用 查询路由信息
    return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
        .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryRoute(request);
}

有了路由信息,生产者就可以开始发消息了,RocketMQ 支持批量发消息。首先把 Message 转换成 PublishingMessageImpl,此时会对消息体做检验,不能超过 4MB,同时生成 MessageID 等。

List<PublishingMessageImpl> pubMessages = new ArrayList<>();
for (Message message : messages) {
    try {
        final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, publishingSettings,
            txEnabled);
        pubMessages.add(pubMessage);
    } catch (Throwable t) {
        future.setException(t);
        return future;
    }
}

之后校验批量发送的消息必须属于同一 Topic、且消息类型是一致的,不能有的是普通消息,有的是顺序消息等。
校验通过后,RocketMQ 会基于路由信息构建一个消息发布的负载均衡器PublishingLoadBalancer,它的职责是从一批队列里选择一个队列发送。

private PublishingLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) {
    this.index = index;
    final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
            .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
                    Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
            .collect(Collectors.toList());
    if (mqs.isEmpty()) {
        throw new IllegalArgumentException("No writable message queue found, topiRouteData=" + topicRouteData);
    }
    this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
}

负载均衡的策略很简单,就是轮询。因为消息发送失败后需要重试,重试的时候需要规避掉失败的 Broker,所以这里做了一些处理,就不展开了。
注意:消息实际写入哪个队列,不是生产者决定的,客户端理论上只是选择一个 Proxy 发消息。如果 Proxy 是 Local 部署消息会写入本机队列,如果是 Cluster 部署,那么客户端把消息发给哪个 Proxy 都无所谓。

因为客户端是用 gRPC 协议通信的,所以请求要先转换成 Protobuf 消息,发消息请求对应的类是SendMessageRequest,里面包含一批要发送的消息,消息也会被转换成支持 Protobuf 序列化的对象apache.rocketmq.v2.Message

private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
    final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
        .map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
    return SendMessageRequest.newBuilder().addAllMessages(messages).build();
}

请求构建好以后,就可以通过RpcClient把请求发出去了

public ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata,
    SendMessageRequest request, Executor executor, Duration duration) {
    this.activityNanoTime = System.nanoTime();
    return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
        .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).sendMessage(request);
}

客户端发消息的流程到此就结束了。

Proxy发消息

Proxy 启动时会开启一个 gRPC 服务GrpcServer,默认监听 8081 端口,核心是请求处理器GrpcMessagingApplication

我们先看 Proxy 是怎么处理客户端查询路由信息请求的:
image.png
入口方法是GrpcMessagingApplication#queryRoute

@Override
public void queryRoute(QueryRouteRequest request, StreamObserver<QueryRouteResponse> responseObserver) {
    Function<Status, QueryRouteResponse> statusResponseCreator = status -> QueryRouteResponse.newBuilder().setStatus(status).build();
    ProxyContext context = createContext();
    try {
        validateContext(context);
        this.addExecutor(this.routeThreadPoolExecutor,
            context,
            request,
            // 查询Topic路由信息
            () -> grpcMessingActivity.queryRoute(context, request)
                    // 完成时发送响应结果给客户端
                .whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)),
            responseObserver,
            statusResponseCreator);
    } catch (Throwable t) {
        // 返回异常信息
        writeResponse(context, request, null, responseObserver, t, statusResponseCreator);
    }
}

Proxy 是无状态的,它并不保存 Topic 路由信息,它会在启动时和定时任务里去请求 Namesrv 拉取然后缓存起来,代码在TopicRouteService的构造函数里:

this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), TimeUnit.SECONDS).
executor(cacheRefreshExecutor).build(new CacheLoader<String, MessageQueueView>() {
    @Override public @Nullable MessageQueueView load(String topic) throws Exception {
        try {
            TopicRouteData topicRouteData = topicRouteCacheLoader.loadTopicRouteData(topic);
            if (isTopicRouteValid(topicRouteData)) {
                MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
                return tmp;
            }
            return MessageQueueView.WRAPPED_EMPTY_QUEUE;
        } catch (Exception e) {
            if (TopicRouteHelper.isTopicNotExistError(e)) {
                return MessageQueueView.WRAPPED_EMPTY_QUEUE;
            }
            throw e;
        }
    }
});

请求最终会委托route.RouteActivity#queryRoute处理,主要步骤:

  • 提取客户端请求 Proxy 的地址,用于后面替换 Broker 的地址
  • Proxy 从本地缓存取出路由信息,但是还不能直接返回,因为客户端不能和 Broker 直接通信
  • Proxy 把 Broker 的 IP 端口替换成自己的,再返回
public CompletableFuture<QueryRouteResponse> queryRoute(ProxyContext ctx, QueryRouteRequest request) {
    CompletableFuture<QueryRouteResponse> future = new CompletableFuture<>();
    try {
        validateTopic(request.getTopic());
        // 客户端请求Proxy的IP地址
        List<org.apache.rocketmq.proxy.common.Address> addressList = this.convertToAddressList(request.getEndpoints());
        String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic());
        /**
         * 本地缓存拿Topic路由 Proxy会定时从Namesrv拉取
         * Proxy把Broker的IP端口替换成Proxy自己的再返回,让客户端只和Proxy通信
         */
        ProxyTopicRouteData proxyTopicRouteData = this.messagingProcessor.getTopicRouteDataForProxy(
                ctx, addressList, topicName);
        List<MessageQueue> messageQueueList = new ArrayList<>();
        Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
        // Topic消息类型 Local模式本地直接取 Cluster模式定时任务拉取
        TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topicName);
        for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
            String brokerName = queueData.getBrokerName();
            Map<Long, Broker> brokerIdMap = brokerMap.get(brokerName);
            if (brokerIdMap == null) {
                break;
            }
            for (Broker broker : brokerIdMap.values()) {
                messageQueueList.addAll(this.genMessageQueueFromQueueData(queueData, request.getTopic(), topicMessageType, broker));
            }
        }
        // 构建响应结果
        QueryRouteResponse response = QueryRouteResponse.newBuilder()
                .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()))
                .addAllMessageQueues(messageQueueList)
                .build();
        future.complete(response);
    } catch (Throwable t) {
        future.completeExceptionally(t);
    }
    return future;
}

生产者拿到路由信息后,就会轮询出一个队列,然后给队列所属的 Broker 发请求,请求实际还是会发给 Proxy。
Proxy 处理发消息的流程如下:
image.png
请求最终会委托给ProducerProcessor#sendMessage处理,主要步骤:

  • 检查发送的消息是否符合 Topic 的消息类型
  • 轮询一个队列,Local 模式会从本机队列轮询,Cluster 模式会从全局队列轮询
  • 构建 Remoting 协议的请求,发送给队列所属的 Broker
  • 响应发送结果
public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSelector queueSelector,
    String producerGroup, int sysFlag, List<Message> messageList, long timeoutMillis) {
    CompletableFuture<List<SendResult>> future = new CompletableFuture<>();
    try {
        Message message = messageList.get(0);
        String topic = message.getTopic();
        if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
            if (topicMessageTypeValidator != null) {
                // Do not check retry or dlq topic
                if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
                    TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(topic);
                    TopicMessageType messageType = parseFromMessageExt(message);
                    topicMessageTypeValidator.validate(topicMessageType, messageType);
                }
            }
        }
        /**
         * 选择一个Queue
         * - Local:返回本机Queue视图,轮询一个(避免多一次网络调用)
         * - Cluster:返回全局Queue视图,轮询一个
         */
        AddressableMessageQueue messageQueue = queueSelector.select(ctx,
            this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
        if (messageQueue == null) {
            throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
        }
        // 构建Remoting协议的消息发送请求头
        SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
        /**
         * 发送消息
         * - Local:直接调本地SendMessageProcessor#processRequest
         * - Cluster:Remoting协议发送请求到Broker
         */
        future = this.serviceManager.getMessageService().sendMessage(
            ctx,
            messageQueue,
            messageList,
            requestHeader,
            timeoutMillis)
            .thenApplyAsync(sendResultList -> {
                for (SendResult sendResult : sendResultList) {
                    int tranType = MessageSysFlag.getTransactionValue(requestHeader.getSysFlag());
                    if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
                        tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
                        StringUtils.isNotBlank(sendResult.getTransactionId())) {
                        fillTransactionData(producerGroup, messageQueue, sendResult, messageList);
                    }
                }
                return sendResultList;
            }, this.executor);
    } catch (Throwable t) {
        future.completeExceptionally(t);
    }
    return FutureUtils.addExecutor(future, this.executor);
}

消息发送最终会委托给消息服务 MessageService,由于 Proxy 有 Local 和 Cluster 两种部署模式,所以也对应了两种实现方案。
Local 模式下,因为目标队列是在本机里面选的,所以直接调用本地方法存储消息即可,少了一次网络调用,延迟会更低;Cluster 模式下,目标队列绝对不在本机,所以索性就从全局队列里面选,Proxy 再根据 Remoting 协议去调用 Broker 存储消息。
至此,Proxy 发消息的流程也结束了,后续流程就是 Broker 如何存储消息了。

尾巴

引入 Proxy 组件后,客户端 SDK 代码更简洁易懂了。生产者先是向 Proxy 查询 Topic 路由信息,然后轮询一个队列,再把消息发给对应的 Proxy,Proxy 再根据部署模式从本机/全局队列里面轮询出一个目标队列,最后把 gRPC 适配成 Remoting 协议发给 Broker 存储消息。
为什么要轮询两次呢?实际上,消息最终写入哪个队列并不是生产者和 Proxy 决定的。我的理解,如果 Proxy 是 Cluster 模式部署的话,生产者的轮询是没有意义的,直接发给 Proxy 就好了。Proxy 虽然也轮询了一次,但它的目的是选出目标 Broker,实际上它发给 Broker 的 queueId 是固定的值 -1,消息最终写入哪个队列,是由 Broker 决定的。

文章来源:https://blog.csdn.net/qq_32099833/article/details/135370824
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。