Kafka从0.8版本开始引入副本(Replica)的机制,其目的是为了增加Kafka集群的高可用性。
Kafka实现副本机制之后,每个分区可以有多个副本,并且会从其副本集合(Assigned Replica,AR)中选出一个副本作为Leader副本,所有的读写请求都由选举出的Leader副本处理。
剩余的其他副本都作为Follower副本,Follower副本会从Leader副本处获取消息并更新到自己的Log中。
我们可以认为Follower副本是Leader副本的热备份。
一般情况下,同一分区的多个副本会被均匀地分配到集群中的不同Broker上,当Leader副本的所在的Broker出现故障后,可以重新选举新的Leader副本继续对外提供服务。
通过这种方式提高了Kafka集群的可用性。
在一个分区的Leader副本中会维护自身以及所有Follower副本的相关状态,而Follower副本只维护自己的状态。
此外,还有“本地副本”和“远程副本”两个概念需要读者注意,“本地副本”是指副本对应的Log分配在当前的Broker上,“远程副本”则是指副本对应的Log分配在其他的Broker上,在当前Broker上仅仅维护了副本的LEO等信息。
一个副本是“本地副本”还是“远程副本”与它是Leader副本还是Follower副本没有直接联系,如图所示。
Kafka服务端使用Replica表示副本以及Replica中维护的信息,其中的partition字段指向了副本所属的分区。
服务端使用Partition表示分区,Partition负责管理每个副本对应的Replica对象,进行Leader副本的切换,ISR集合的管理以及调用日志存储子系统完成写入消息,它还提供了一些其他的辅助方法。
Partition中的核心字段的含义如下所述。
Partition中的方法按照功能可以划分为下列五类。
getOrCreateReplica()方法主要负责在AR集合(assignedReplicaMap)中查找指定副本的Replica对象,如果查找不到则创建Replica对象并添加到AR集合中管理。
如果创建的是Local Replica,还会创建(或恢复)对应的Log并初始化(或恢复)HW。
HW与Log.recoveryPoint类似,也会需要记录到文件中保存,在每个lig目录下都有一个replication-offset-checkpoint文件记录了此目录下每个分区的HW。
在ReplicaManager启动时会读取此文件到highWatermarkCheckpoints这个Map中,之后会定时更新replication-offset-checkpoint文件。
Partition除了对副本的Leader/Follower角色进行管理,还需要管理ISR集合。
随着Follower副本不断与Leader副本进行消息同步,Follower副本的LEO会逐渐后移,并最终追赶上Leader副本的LEO,此时该Follower副本就有资格进入ISR集合。
Partition.maybeExpandIsr()方法实现了扩张ISR集合的功能,其调用栈如图所示,它是在updateFollowerLogReadResults()方法中被调用的,在前面介绍DelayedFetch的处理流程时提到过此方法的功能,该方法用于处理来自Follower的FetchRequest。
在分区中,只有Leader副本能够处理读写请求。
Partition.appendMessagesToLeader方法提供了向Leader副本对应的Log中追加消息的功能。
在前面介绍的DelayedProduce处理流程中,ReplicaManager.appendToLocalLog()方法就是基于此方法实现的。
在一个Broker上可能分布着多个Partition的副本信息,ReplicaManager的主要功能是管理一个Broker范围内的Partition信息。
ReplicaManager的实现依赖于前面介绍的日志存储子系统、DelayedOperationPurgatory、KafkaScheduler等组件,底层依赖于Partition和Replica。
ReplicaManager中各个字段的含义和功能如下所述。
在Kafka集群中会选举一个Broker成为KafkaController的Leader,它负责管理整个Kafka集群。
Controller Leader根据Partition的Leader副本和Follower副本的状态向对应的Broker节点发送LeaderAndIsrRequest,这个请求主要用于副本的角色切换,即指导Broker将其上的哪些分区的副本切换成Leader角色,哪些分区的副本切换成Follower介绍。
LeaderAndIsrRequest首先由KafkaAPis.handleLeaderAndIsrRequest()方法进行处理,其核心逻辑是通过ReplicaManager提供的becomeLeaderOrFollower方法实现的,而becomeLeaderOrFollower又依赖于上一小节介绍的Partition.makeLeader方法和makeFollower方法,上述调用关系如图所示。
在开始分析becomeLeaderOrFollower方法前,先来介绍一下LeaderAndIsrRequest和LeaderAndIsrResponse的格式,如图所示。在LeaderAndIsrRequest中比较重要的是partition_states集合这个字段,其中包含了每个分区的Leader副本所在的Brokerld、ISR集合、AR集合以及zk_version等信息。在LeaderAndIsrResponse的partitions集合字段中记录了每个分区的副本在当前Broker上的切换结果。
ReplicaManager.becomeLeaderOrFollower方法的主要逻辑是:获取(或创建)指定的Partition对象,根据partitionStates的信息对其切换成Leader/Follower的副本进行分类,并分别调用makeLeader和makeFollowers方法完成切换。
之后会启动highwatermark-checkpoint任务,然后关闭空闲的Fetcher线程,调用onLeadershipChange回调函数。
当Local Replica切换为Leader副本之后,就可以处理生产者发送的ProducerRequest,将消息写入到Log中。
在前面分析DelayedProduce的处理流程时,简单介绍了ReplicaManager.appendMessages方法,当时着重关注了与DelayedProduce相关的处理以及sendResponseCallback回调方法的实现。
这里详细分析appendToLocalLog方法的实现,它首先会检测消息要写入的Topic是否为Kafka的内部Topic(目前Kafka只有OffsetsTopic一个内部Topic),如果是内部Topic则需要检测是否允许对内部Topic进行追加,最终调用Partition.appendMessagesToLeader()方法完成消息追加。
appendToLocalLog方法的第二个参数记录了每个分区需要追加的消息集合。
Follower副本与Leader副本同步的功能由ReplicaFetcherManager组件实现。
ReplicaFetcherManager继承了AbstractFetcherManager。
AbstractFetcherManager的继承和依赖关系如图所示。
在AbstractFetchManager中使用fetcherThreadMap字段(HashMap[BrokerAndFetcherld,AbstractFetcherThread]类型)管理AbstractFetcherThread,该Map的key值是BrokerAndFetcherld类型对象,其中封装了Broker的网络位置信息(brokerld、host、port等)以及对应的Fetcher线程的id。
AbstractFetcherManager中还提供了addFetcherForPartitions方法、removeFetcherForPartitions方法和shutdownldleFetcherThreads方法对fetcherThreadMap集合进行管理。
AbstractFetcherManager.addFetcherForPartitions()方法会让Follower副本从指定的offset开始与Leader副本进行同步。
该方法的参数涉及BrokerAndInitialOffset类,它其中封装了Broker的网络位置信息以及同步的起始offset。
具体的同步逻辑交由ReplicaFetcherThread线程处理。
在Follower发送ListOffsetRequest期间,新Leader可能不断追加消息,新Leader的LEO落后于Follower的LEO的场景得到改变,此时就不再需要进行截断操作了,Follower可以继续从其LEO与Leader进行同步。
这样新Leader与Follower的消息可能存在不一致的情况,如图所示。
当Broker接收到来自KafkaController的StopReplicaRequest请求时,会关闭其指定的副本,并根据StopReplicaRequest中的字段决定是否删除副本对应的Log。
在分区的副本进行重新分配、关闭Broker等过程中都会使用到此请求,但是需要注意的是,StopReplicaRequest并不代表一定会删除副本对应的Log,例如shutdown的场景下就没有必要删除Log。
而在重新分配Partition副本的场景下,就需要将旧副本及其Log删除。
先来介绍StopReplicaRequest、StopReplicaResponse的格式,如图所示。
StopReplicaRequest中的delete_partitions字段是一个boolean类型的值,表示是否要删除副本及其Log,partitions集合字段中记录待关闭的分区信息。
StopReplicaResponse的partitions集合记录了每个分区对应的处理结果。
API层接收到StopReplicaRequest后直接调用了ReplicaManager.stopReplicas()方法进行处理。
stopReplicas方法首先检查请求中的controllerEpoch值,之后停止指定分区的同步操作,最后遍历partitions集合根据delete_partitions的值决定是否对Log进行删除。
在ReplicaManager中总 共 有highwatermark-checkpoint、isr-expiration、isr-change-propagation三个定时任务。
highwatermark-checkpoint任务会周期性地记录每个Replica的HW并保存到其log目录中的replication-offset-checkpoint文件中。
isr-expiration任务会周期性地调用maybeShrinkIsr()方法检测每个分区是否需要缩减其ISR集合。
isr-change-propagation任务会周期性地将ISR集合发生变化的分区记录到ZooKeeper中。
如果检测到highwatermarkcheckpoint任务未启动,会调用startHighWaterMarksCheckPointThread方法启动highwatermark-checkpoint任务。
MetadataCache是Broker用来缓存整个集群中全部分区状态的组件。
KafkaController通过向集群中的Broker发送UpdateMetadataRequest来更新其MetadataCache中缓存的数据,每个Broker在收到该请求后会异步更新MetadataCache中的数据。
MetadataCache中各字段的含义和功能如下所述。
在开始分析Kafka处理UpdateMetadataRequest请求更新MetadataCache的流程之前,先来看一下UpdateMetadataRequest和UpdateMetadataResponse的格式,如图所示。