在开始介绍Rebalance操作的实现细节之前,我们需要明确在哪几种情况下会触发Rebalance操作:
Rebalance操作的第一步就是查找GroupCoordinator,这个阶段消费者会向Kafka集群中的任意一个Broker发送GroupCoordinatorRequest请求,并处理返回的GroupCoordinatorResponse响应。
GroupCoordinatorRequest消息体的格式比较简单,只包含了Consumer Group的id。GroupCoordinatorResponse消息体包含了错误码(short类型)、coordinator的节点Id(int类型)、GroupCoordinator的host(String类型)、GroupCoordinator的端口号(int类型)。
发送GroupCoordinatorRequest请求的入口是ConsumerCoordinator的ensureCoordinatorReady方法,其流程如图所示。
首先检测是否需要重新查找GroupCoordinator,主要是检查coordinator字段是否为空以及与GroupCoordinator之间的连接是否正常。
查找集群负载最低的Node节点,并创建GroupCoordinatorRequest请求。调用client.send方法将请求放入unsent队列中等待发送,并返回RequestFuture对象。返回的RequestFuture对象经过了compose方法适配,原理同HeartbeatCompletionHandler。
调用ConsumerNetworkClient.poll(future)方法,将GroupCoordinatorRequest请求发送出去。此处使用阻塞的方式发送,直到收到GroupCoordinatorResponse响应或异常完成,才从此方法返回。
检测检查RequestFuture对象的状态。如果出现RetriableException异常,则调用ConsumerNetworkClient.awaitMetadataUpdate()方法阻塞更新Metadata中记录的集群元数据后跳转到步骤1继续执行。如果不是RetriableException异常则直接报错。
如果成功找到GroupCoordinator节点,但是网络连接失败,则将其unsent中对应的请求清空,并将coordinator字段置为null,准备重新查找GroupCoordinator,退避一段时间后跳转到步骤1继续执行。
下面介绍处理GroupCoordinatorResponse的相关操作。通过对sendGroupCoordinatorRequest方法的分析我们知道,handleGroupMetadataResponse)方法是处理GroupCoordinatorResponse的入口,其步骤如下:
在成功查找到对应的GroupCoordinator之后进入Join Group阶段。在此阶段,消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。先来了解JoinGroupRequest和JoinGroupResponse的消息体格式,如图所示。
了解了JoinGroupRequest和JoinGroupResponse的格式之后,再来分析第二阶段的相关处理流程,其入口函数是ensurePartitionAssignment方法。
ensurePartitionAssignment方法的流程如图所示。
调用SubscriptionState.partitionsAutoAssigned方法,检测Consumer的订阅是否是AUTO_TOPICS或AUTO_PATTERN。因为USER_ASSIGNED不需要进行Rebalance操作,而是由用户手动指定分区。
如果订阅模式是AUTO_PATTERN,则检查Metadata是否需要更新。
在前面提到过,在ConsumerCoordinator的构造函数中为Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则表达式过滤Topic,并更改SubscriptionState中的订阅信息。同时,也会使用metadataSnapshot字段记录当前的Metadata的快照。这里要更新Metadata的原因,是为了防止因使用过期的Metadata进行Rebalance操作而导致多次连续的Rebalance操作。
调用ConsumerCoordinator.needRejoin()方法判断是要发送JoinGroupRequest加入ConsumerGroup,其实现是检测是否使用了AUTO_TOPICS或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值。
调用onJoinPrepare方法进行发送JoinGroupRequest请求之前的准备,做了三件事:一是如果开启了自动提交offset则进行同步提交offset,提交offset的内容后面会详细介绍,此步骤可能会阻塞线程;二是调用注册在SubscriptionState中的ConsumerRebalanceListener上的 回调方法;三是将SubscriptionState的needsPartitionAssignment字段设置为true并收缩groupSubscription集合。
再次调用needRejoin方法检测,之后调用ensureCoordinatorReady方法检测已经找到GroupCoordinator且与之建立了连接。
如果还有发往GroupCoordinator所在Node的请求,则阻塞等待这些请求全部发送完成并收到响应(即等待unsent及InFlightRequests的对应队列为空),然后返回步骤5继续执行,主要是为了避免重复发送JoinGroupRequest请求。
调用sendJoinGroupRequest方法创建JoinGroupRequest请求,并调用ConsumerNetworkClient.send方法将请求放入unsent中缓存,等待发送。
在步骤7返回的RequestFuture对象上添加RequestFutureListener。
调用ConsumerNetworkClient.poll方法发送JoinGroupRequest,这里会阻塞等待,直到收到JoinGroupResponse或出现异常。
检测RequestFuture.fail。如果出现RetriableException异常则进行重试,其他异常则报错。如果无异常,则整个第二阶段操作完成。
通过前面对JoinGroupRequest发送流程的分析,我们了解到JoinGrouResponse处理流程的入口是JoinGroupResponseHandler:handle()方法,其中还包括了SyncGroupRequest发送的操作。
JoinGrouResponse的处理流程如图所示。
完成分区分配之后就进入了Synchronizing Group State 阶段,主要逻辑是向GroupCoordinator 发送 SyncGroupRequest 请求并处理 SyncGroupResponse 响应。
先来了解SyncGroupRequest 和 SyncGroupResponse 的消息体格式。
SyncGroupRequest 中各个字段的含义如表
SyncGroupResponse 中各个字段的含义如表
通过前面对onJoinLeader方法分析,我们知道发送 SyncGroupRequest 请求的逻辑紧接在分区分配操作之后,也是在 onJoinLeader方法中完成的。下面是其流程:
从SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()方法处理,调用此方法的是在第二阶段ensureActiveGroup方法的步骤8中添加的RequestFutureListener中调用。onJoinComplete()方法的流程如图所示。