Kafka将状态信息保存在Zookeeper中,这些状态信息记录了每个Kafka的Broker服务与另外的Broker服务 有什么不同。通过这些差异化的功能,共同体现出集群化的业务能力。这些数据,需要在集群中各个Broker 之间达成共识,因此,需要存储在一个所有集群都能共同访问的第三方存储中。
Kafka的整体集群结构:
Kafka的集群中,最为主要的状态信息有两个:
在多个Broker中,需要选举出一个Broker,担任 Controller角色。由Controller角色来管理整个集群中的分区和副本状态。
在同一个Topic下的多个 Partition中,需要选举出一个Leader角色。由Leader角色的Partition来负责与客户端进行数据交互。
Zookeeper数据整体分布图:
Zookeeper客户端工具: prettyZoo。下载地址:Releases · vran-dev/PrettyZoo · GitHub
在Kafka集群进行工作之前,需要选举出一个Broker来担任Controller角色,负责整体管理集群内的分区和副本状态。选举Controller的过程就是通过抢占Zookeeper的/controller节点来实现的。
当一个集群内的Kafka服务启动时,就会尝试往Zookeeper上创建一个/controller临时节点,并将自己的 brokerid写入这个节点。而Zookeeper会保证在一个集群中,只会有一个broker能够成功创建这个节点。这个注册成功的broker就成 了集群当中的Controller节点。节点内容:
{"version":2,"brokerid":0,"timestamp":"1702350416126","kraftControllerEpoch":-1}
临时节点:长时间断开连接,会被自动删除
监听机制:监听节点的状态,节点被删除后给所有监听器广播节点状态变化事件
Controller节点,负责监听Zookeeper中的其他一些关键节点,触发集群的相关管理工作:
监听Zookeeper中的/brokers/ids节点,感知Broker增减变化。
监听/brokers/topics,感知topic以及对应的partition的增减变化。
监听/admin/delete_topic节点,处理删除topic的动作。
另外,Controller还需要负责将元数据推送给其他Broker。
在Kafka中,一个Topic下的所有消息,是分开存储在不同的Partition中的。
使用kafka-topics.sh脚本创 建Topic时
通过--partitions
参数指定Topic下包含多少个Partition
通过--replication-factors
参数指定每个Partition有几个备份
在一个Partition的众多备份中,需要选举出一个Leader Partition,负责对接所有的客户端请求,并将消息优先保存,然后再通知其他Follower Partition来同步消息。
涉及的基础概念:
AR: Assigned Repllicas。 表示Kafka分区中的所有副本(存活的和不存活的)
ISR: 表示在所有AR中,服务正常且保持与Leader同步的Follower集合。如果Follower长时间没有向Leader发送通信请求(超时时间由replica.lag.time.max.ms参数设定,默认30S),那么这个Follower就会被踢出ISR。(在老版本的Kafka中,还会考虑Partition与Leader Partition之间同步的消息差值,大于参数replica.lag.max.messages条就会被移除ISR。现在版本已经移除了这个参数)
OSR:表示从ISR中踢出的节点。记录那些服务有问题,延迟过多的副本。
# 查看状态命令
bin/kafka-topics.sh -bootstrap-server localhost:9092 --describe --topic disTopic
在选举Leader Partition时,会按照AR中的排名顺序,靠前的优先选举。只要当前Partition在ISR列表中,也就是是存活的,那么这个节点就会被选举成为Leader Partition。(AR就是Replicas列中的Broker集合)
默认情况下,Kafka会尽量将Leader Partition分配到不同的Broker节点上,用以保证整个集群的性能压力能够比较平均。但是,经过Leader Partition选举后,这种平衡就有可能会被打破,让Leader Partition过多的集中到同一个Broker上,影响集群的整体性能。为此,Kafka设计了Leader Partition自动平衡机制,当发现Leader分配不均衡时,自动进行Leader Partition调整。
Leader Partition自平衡逻辑:会认为AR当中的第一个节点就应该是Leader节点。这种选举结果称为preferred election 理想选举结果。Controller会定期检测集群的Partition平衡情况,在开始检测时,Controller会依次检查所有的Broker。当发现这个Broker上的不平衡的Partition比例高于leader.imbalance.per.broker.percentage阈值时,就会触发一次Leader Partiton的自平衡。
官方文档的部分截图:
涉及Broker中server.properties配置文件的几个重要参数:
#1 自平衡开关。默认true
auto.leader.rebalance.enable
Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered.
Type: boolean
Default: true
Valid Values:
Importance: high
Update Mode: read-only
?
#2 自平衡扫描间隔
leader.imbalance.check.interval.seconds
The frequency with which the partition rebalance check is triggered by the controller
Type: long
Default: 300
Valid Values: [1,...]
Importance: high
Update Mode: read-only
?
#3 自平衡触发比例
leader.imbalance.per.broker.percentage
The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.
Type: int
Default: 10
Valid Values:
Importance: high
Update Mode: read-only
注意:使用集群时,要修改集群中所有broker的文件,并且重启Kafka服务才能生效
# 手动触发所有Topic的Leader Partition自平衡
bin/kafka-leader-election.sh --bootstrap-server worker1:9092 --election-type preferred ?--topic secondTopic --partition 2
Leader Partition自平衡的过程是一个非常重的操作,因为要涉及到大量消息的转移与同步。并且,在这个过程中,会有丢消息的可能。所以在很多对性能要求比较高的线上环境,会选择将参数auto.leader.rebalance.enable设置为false,关闭Kafka的Leader Partition自平衡操作,用其他运维的方式,在业务不繁忙的时间段,手动进行Leader Partiton自平衡,尽量减少自平衡过程对业务的影响。
当一组Partition中选举出了一个Leader节点后,这个Leader节点就会优先写入并保存Producer传递过来的消息,然后再同步给其他Follower。当Leader Partition所在的Broker服务发生宕机时,Kafka就会触发Leader Partition的重新选举。
Kafka为了消息能够在多个Parititon中保持数据同步,内部记录了两个关键数据:
LEO(Log End Offset): 每个Partition的最后一个Offset
每个Partition都会记录自己保存的消息偏移量。leader partition收到并记录了生产者发送的一条消息,就将LEO加1。follower partition从leader partition同步消息,每同步到一个消息,自己的LEO就加1。通过LEO值,就知道各个follower partition与leader partition之间的消息差距。
HW(High Watermark): 一组Partiton中最小的LEO。
follower partition每次往leader partition同步消息时,都会同步自己的LEO给leader partition。这样leader partition就可以计算出这个HW值,并最终会同步给各个follower partition。leader partition认为这个HW值以前的消息,都是在所有follower partition之间完成了同步的,是安全的。这些安全的消息可以被消费者拉取。而HW值之后的消息,是不安全的,可能丢失的。这些消息如果被消费者拉取消费了,就有可能造成数据不一致。
当服务出现故障时,如果是Follower发生故障,不会影响消息写入,只是少了一个备份而已。Kafka会做如下处理:
1、将故障的Follower节点临时踢出ISR集合。其他Leader和Follower继续正常接收消息。
2、出现故障的Follower节点恢复后,不会立即加入ISR集合。该Follower节点会读取本地记录的上一次的HW,将自己的日志中高于HW的部分信息全部删除掉,然后从HW开始,向Leader进行消息同步。
3、等到该Follower的LEO大于等于整个Partiton的HW后,就重新加入到ISR集合中。也就是说这个Follower的消息进度追上了Leader。
如果是Leader节点出现故障,Kafka为了保证消息的一致性,会做如下处理:
1、Leader发生故障,会从ISR中进行选举,将一个原本是Follower的Partition提升为新的Leader。这时,消息有可能没有完成同步,所以新的Leader的LEO会低于之前Leader的LEO。
2、Kafka中的消息都只能以Leader中的备份为准。其他Follower会将各自Log文件中高于HW的部分全部清理掉,然后从新的Leader中同步数据。
3、旧的Leader恢复后,将作为Follower节点,进行数据恢复。
在这个过程当中,Kafka注重的是保护多个副本之间的数据一致性。但是消息的安全性就得不到保障。
思考:HW和LEO是一个分布式的值,怎么保证HW在多个Broker中是一致的呢?