目录
2.6、生产经验—Leader Partition 负载均衡
? ? ? ? 今天学习 Kafka 的第二部分 Broker,相比较 Flink ,Kafka 的流式数据处理还是比较好理解的。
环境准备:
我们先看看 Zookeeper 中存储了 Kafka 的哪些信息,它俩是如何协调工作的,
我们需要在任意节点启动 Zookeeper 客户端:
# 启动zookeeper客户端
bin/zkCli.sh
# 查看信息
ls /kafka/brokers/ids
/kafka/brokers/ids?
可以看到,我们当前启动了三个 Kafka broker节点。
/kafka/brokers/topics/
这个目录下我们可以看到指定主题的分区信息:
/kafka/consumers/
/kafka/controller
辅助节点选举,这个 controller 是谁取决于哪个节点先在 /kafka/controller 中注册的。在Kafka中,controller负责管理整个集群中所有分区和副本的状态。当检测到某个分区的ISR集合(ISR 就是所有 leader 和 follower 节点之间同步正常的节点集合)发生变化时,controller会负责通知所有broker更新其元数据信息。
参数名称 | 描述 |
replica.lag.time.max.ms | ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。 |
auto.leader.rebalance.enable | 默认是true。 自动Leader?Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 |
leader.imbalance.check.interval.seconds | 默认值300秒。检查leader负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。 |
log.index.interval.bytes | 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。 |
log.retention.hours | Kafka中数据保存的时间,默认7天。 |
log.retention.minutes | Kafka中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是5分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。 |
log.cleanup.policy | 默认是delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的50%的1/3 |
num.network.threads | 默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。 |
?AR = ISR + OSR
ISR,表示和Leader保持同步的Follower集合。如果 Follower 长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR(踢出到 OSR)。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从 ISR 中选举新的Leader。
OSR,表示Follower与Leader副本同步时,延迟过多的副本(特定条件下会恢复到 ISR)。
????????Kafka集群中有一个 broker 的 Controller 会被选举为 Controller?Leader,负责管理集群broker的上下线,所有 topic 的分区副本分配和Leader选举等工作。
????????Controller的信息同步工作是依赖于Zookeeper的。
接下来我们来验证一下:
1. 创建一个新的topic,3个分区,3个副本
./kafka-topics.sh --bootstrap-server hadoop102:9092 -create --topic buy --partitions 3 -replication-factor 3
2. 查看 Leader 分布情况
注意区分 AR 和 ISR:?
?
3.?停止掉hadoop103 的 kafka 进程,并查看Leader分区情况
上面的图中我们可以看到hadoop103上的副本一共有 3 个 ,分别存在节点 [2,0,1] 上,并且当前的副本 leader 是 2,现在我们把 hadoop103 节点的 kafka 进程停掉,相当于保存在分区 1?的副本全部故障,而我们分区 3 中的副本的 leader 刚好是节点1,所以要把副本的 leader 进行更新,要按照理论应该是 2?号节点继续上任副本 leader 的位置。
?
我们发现,我们分区 3 的副本 leader??原本存在分区2 中,但是现在分区 2?挂掉了,所以它会从自己的 AR 中选一个在前面的节点 [2,0],于是 分区2 中的 副本 leader 变成了节点 2。
4. 我们再恢复一下hadoop103
[lyh@hadoop103 bin] ./kafka-server-start.sh -daemon ../config/server.properties
概念:
注意:只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复!
比如上面,我们的 leader 一开始是节点0,但是 leader 挂掉之后,leader 变成了节点1,但是新的leader 显然比旧的 leader 的数据少了3条,所以这部分数据就丢失了(毕竟自己处理问题总不能让人家生产者再发送一遍)。
????????如果kafka服务器只有3个节点,那么设置kafka 的分区数大于服务器台数,在kafka底层如何分配存储副本呢?
? ? ? ? 注意:分区数可以大于机器数,毕竟大不了一个机器上多存几个分区,但是副本数不能大于机器数,那样没有意义,而且 Kafka 会报错:
我有三台机器,上面试着创建了 5 个副本的一个 topic,报错不能超过机器数量。
1)创建12分区,3个副本
./kafka-topics.sh -bootstrap-server hadoop102:9092 --create --topic card --partitions 12 --replication-factor 3
2)查看分区和副本信息
我们可以看到,Kafka 使得我们每个副本的 Leader 都尽可能的不一样,这样很好地分担了读写压力,毕竟Kafka 生产者和消费者都是只对分区 leader 进行操作的。每4个分区对应一个副本,而且每一个副本的 AR 顺序 Kafka 都尽量使它们不一样,这样可以尽可能做到负载均衡。
????????生产环境中,每台服务器的配置和性能都不一致,但是 Kafka 只会根据自己的代码创建对应的分区副本,就会导致个别服务器存储压力比较大。所以需要手动调整分区副本的存储。
? ? ? ? 需求:创建一个新的 topic ,4个分区,两个副本。我们将该 topic 的所有副本都存储到 hadoop102 和 hadoop103 上,hadoop104 不存储任何数据。
1)创建一个新的 topic 叫做 card ,4个分区,2个副本
2)查看分区副本信息
3)创建副本存储计划(所有副本都指定存储在hadoop102、hadoop103中)。
4)执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-repliaction-factor.json --execute
5)验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-repliaction-factor.json --verify
6)查看分区副本存储情况
? ? ? ? 正常情况下,Kafka 本身会自动把 Leader Partition 均匀分散在各个机器上,来保证每台节点的吞吐量都是均匀的。但是如果某些 broker 节点宕机,会导致 leader partition 过于集中在其他少部分几台 broker 节点上,这会导致少数几台 broker 节点的读写压力过高,而且即使这些宕机的 broker 再次恢复上线,也只是 follower partition ,不会再次恢复为 leader partition,造成集群负载不均衡。
在生产环境中,通常不建议开启自动平衡,因为这可能会影响性能。
? ? ? ? 假设我们创建了一个主题并设置副本数为 1 ,但是后来我们发现这部分数据特别重要,于是想要增加副本数,怎么办呢?通过命令行是不行的,得像我们上面 2.5 手动调整分区副本 一样,通过 json 文件来修改。
手动增加副本存储
1)创建副本存储计划(所有副本都指定存储在hadoop102、hadoop103、hadoop104中)
vim increase-replication-factor.json
# 输入下面的内容
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
2)执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
? ? ? ? Topic是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应一个 log 文件,该 log 文件中存储的就是 producer 产生的数据。producer 产生的数据会被不断追加(追加是 Kafka 能够高效读写的一个重要原因)到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分区和索引机制,将每个 partition 分为多个 segment 。每个 segment 包括多个:“.index”文件、“.log”文件和 .timeindex 等文件(.timeindex 是一个时间戳索引文件,描述了文件保存时间,因为 Kafka 中的数据默认为保存 7 天才会自动删除,而判断文件日期是否达到7天就需要判断这个文件)。这些文件位于一个文件夹下,文件夹的命名规则为:topic名称+分区序号,例如:like-0。
# 000000~170409
00000000000000000000.index
00000000000000000000.log
# 170410~239429
00000000000000170410.index
00000000000000170410.log
# 239430~
00000000000000239430.index
00000000000000239430.log
验证:
1. 查看 kafka 的 data 目录下的 topic 数据
2. 这些文件因为都是经过序列化的所以都是乱码,需要使用 Kafka 提供的一个工具来看:?
# 查看 .index 文件
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000015.index
# 查看 .log 文件
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000015.log
现有索引文件 000000.index,0000522.index,000001005.index ,如何在 log 文件中定位到 offset=600 的record?
可以看到 587 就是我们的 index 文件名中的基础offset+相对offset得到的,计算得到 587 后我们发现要找的 offset=600 是大于 587 小于 639 的,说明要找的 record 就在这一行。于是查看 position 得到 6410,接着查看 log文件。
找到?000000522.index 对应的 000000522.log 文件,这个 log 文件同样有一个 position 属性,我们要找的 6410 刚好在log文件中就有一条记录的position=?6410,这就找到了。如果我们要找的position=6415,那么我们就得找到介于这个值中间的数据,因为 6410 < 6415 <10090 所以我们要找的 position=6415的数据就在 6410这一行。
Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。
这四个参数的优先级从上到下越来越高,也就是说 当我们设置了日志保存参数为 ms 级别时,前面设置的 hours 和 minutes 级别的参数就都失效了。
那么日志一旦超过了设置的时间,怎么处理呢?
Kafka中提供的日志清理策略有?delete 和 compact?两种。
(1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。
log.retention.bytes(long),默认等于-1,表示无穷大。
思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?
?当然是以segment中所有记录中的最大时间戳作为该文件时间戳,所以即使数据有99.9%是旧的,只要有0.01%是新的数据,就得等它过期了才能删除。
compact 日志压缩:对于相同的 key 的不同 value 只保留最后一个版本。
要使用这个功能,只需要修改配置
????????这种压缩只能用于特定场景,比如消息的 key 是用户id,value是用户的资料,通过这个压缩,整个消息集里就保存了所有用户最新的信息。
1)Kafka本身是分布式集群,可以采用分区技术,并行度高
2)读数据采用稀疏索引,可以快速定位要消费的数据
3)顺序写磁盘
????????Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
4)页缓存 + 零拷贝技术
零拷贝:Kakfa 把对数据操作的步骤放到了 生产者和消费者当中(生产者和消费者可以在拦截器来对数据进行处理),所以 Kafka Broker 应用层并不关心数据的存储,所以数据都不需要走应用层,直接走网卡就可以传输费 消费者,传输效率高。
pageCache 页缓存:Kafka 重度依赖底层操作系统提供的 PageCache 功能。当上层有写操作时,操作系统只是将数据写入 pageCache。当读操作发生时,先从 pageCache 中查找,如果找不到,再去磁盘读取。实际上 pageCache 是尽可能把更多的空闲内存空间都当做了磁盘缓存来使用。
参数 | 描述 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。 |
? ? ? ? 这一节用到的 prettyZoo 挺震撼我的,JavaFX 能开发出如此漂亮使用的一款软件?,实在让我想不到,希望自己写的软件有一天也可以为百千人使用。