一些kafka的基础使用以及说明请参考上一篇文章kafka的基础入门。这篇文章主要是写kafka的一些高级特性、存储结构以及原理。
高可用是很多分布式系统中必备的特征之一,Kafka的高可用是通过基于 leader-follower的多副本同步实现的
kafka中topic的每个partition有一个预写式日志文件,每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中,partition中的每个消息都有一个连续的序列号叫做offset,确定他在partition中的唯一位置。
日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值,副本写入消息的时候,会自动更新 LEO 值。leader分区和follower分区的LEO值是不一定相同的
高水位值,小于 HW 值的消息被认为是“已提交”或“已备份”的消息,并对消费者可见,在HW值以内的消息是绝对安全的。HW以下的消息才会对消费者可见。HW取的是LEO和remote LEO的最小值,统计的是ISR队列中的follower副本节点,follower副本由于自身原因到了OSR队列之前的remote LEO会删除。
leader分区副本会记录自己的LEO以及远程的follower分区副本的LEO值也就是(remote LEO)。HW的值取的就是这两个LEO的最小值。remote LEO和HW在follower向leader拉取消息的时候更新。
生产者发送了一条消息到leader副本分区,写入该条消息后 leader 更新为 LEO = 1,follower副本在发送 fetch 请求同步leader数据时携带当前最新的 offset = 0,leader 处理 fetch 请求时,更新 remoteLEO = 0,对比 LEO 值最小为 0,所以 HW = 0。follower写入后自己的LEO变为1 发送第二轮 fetch 请求,携带当前最新的 offset = 1,leader 处理 fetch 请求时,更新remote LEO = 1,对比 LEO 值最小为 1,所以 HW = 1,
leader 中保存的 remote LEO 值的更新总是需要额外一轮 fetch RPC 请求才能完成,这意味着在 leader 切换过程中,会存在数据丢失以及数据不一致的问题。
第一轮follower节点发送fetch请求的时候,follower节点提交的offset值为1,更新的主节点的hw为1 leo为2 remote leo为1,这时候follower节点的hw为1 leo为2, 在follower节点第二轮发送fetch请求的时候,更新的主节点的hw为2 leo为2 remote leo为2,在第二轮请求响应的时候主节点宕机了,这时候重新选取主节点这个follower节点被选取为主节点这时候本应该是hw为2 leo为2 remote leo为2的主节点变成了hw为1 leo为2 hw本应该是2却变成了1,这时候为了保证消息一致性会截断日志将主节点变成hw为1leo为1,数据也就丢失了。
leader节点有3条消息,m1 m2 m3,leader节点已经完成了HW更新但是却没有同步到follower节点,follower节点的hw值仍然是2。这时候机房整体宕机。follower节点先恢复的,这时候follower节点先恢复启动,follower节点被选取为主节点,因为它的hw值是2为了保证数据的一致性会截断数据,导致主节点的消息变为m1 m2 这时候之前的leader节点恢复了它的消息是m1 m2 m3 这时候又来了一个消息 m4 主节点变为m1 m2 m4 它跟follower节点一比对它们的hw值都一样不做数据同步 这时候主节点的第三个offset存的值是m3 从节点的第三个offset存的是m4出现了消息数据不一致的情况。
就是添加一个纪元值和leo值,follower epoch会发起一个leaderepochRequest请求判断leader 和 follower的epoch是否一致,如果不一致通过leader-epoch-checkpoint处理这个问题。
Epoch:一个单调增加的版本号,每当副本领导权发生变更时,都会增加该版本号,小版本号的Leader 被认为是过期 Leader,不能再行使 Leader 权力
起始位移(Start Offset):Leader 副本在该 Epoch 值上写入的首条消息的位移(LEO。
不过实际上消息丢失和不一致的概率比较小可能1年都丢不了1条。看具体业务而定。不过kafka在设置了安全性参数以后性能会变差一些,如果真的考虑以安全性为主考虑用rocketmq吧,如果主打的是性能用kafka才是比较好的选择。
kafka在处理重要不可以丢失数据的时候要设置ack = all,分区副本数量和最小同步副本数量都要要大于等于2,并且要做消息的幂等性处理来保证消息不重复。kafka自身只提供生产者到broker的消息幂等处理kafka默认已经配置了这个参数不过这里的幂等也是单分区幂等性,消费者端是需要自己写代码控制的。
kafka只能保证单分区的数据有序
kafka1.0配置max.in.flight.requests.per.connection的值为1以内才能解决单分区数据有序的问题,1.0版本以后默认配置了消息发送的幂等性配置max.in.flight.requests.per.connection的值为1到5都可以。因为启用幂等后,Kafka服务器会在服务器缓存最近的五个请求的元数据,因而无论如何,kafka可以保证最近5个request的数据都是有序的。每次将Broker内存中的元数据落盘前,都会和缓存中的元数据对比,如果不是连续的 seqNumber 数据无法落盘,也就无法正确返回 ack,request 将无法得到响应而阻塞。同时,因为服务器只会缓存Producer发来的最近5个request的幂等性元数据,故而服务器最大能够处理排序的消息也是五个,sender线程保证顺序能发送到Broker中的单分区数据也必须小于五个,故而需要设置 max.in.flight.requests.per.connection=[1,5] 。
kafka作为业务上的单纯的mq使用功能不完善主要体现在消费信息的方面
一个broker中的topic的partiition对于一个消费者组只能消费一次,如果一个topic所有的消费者都在一个消费者组就变成了队列模型类似rabbitmq的直连交换器,如果一个topic所有消费者都在不同的组那么就完全变成了发布订阅模式劣势rabbitmq的扇形交换器。
kafka消费者组内部有很多消费者,这些消费者都公用一个id(group id),一个组内的所有消费者共同协作,完成对订阅的topic的所有partition消费,其中一个主题的一个分区只能由一个消费者消费。
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费,Kafka提供了3种消费者分区分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。Kafka默认采用RangeAssignor的分配算法
相邻的分区尽量分配给同一个消费者
分区总数/消费线程数,如果有余,则表明有的消费线程之间分配的分区不均匀,那么这个多出来的分区会给前几个消费线程处理。
比如7个分区,3个comsumer,则7/3=2,余1,这个表明如果3个消费线程均分7个分区还会多出1个分区,那么这个多出的额外分区就会给前面的消费线程处理,所以它会把第一个分区先给到contomer-1 消费线程消费
contomer-1 partition 0,partition 1,partition 2
contomer-2 partition 3,partition 4
contomer-3 partition 5,partition 6
按照consumer的顺序以轮询的方式进行分配
contomer-1 partition 0,partition 3,partition 6
contomer-2 partition 1,partition 4
contomer-3 partition 2,partition 5
黏性分配策略
前两个分配策略比如再加一台消费者contomer-4或者由一台消费者宕机了,所有的分区和消费者关系要再均衡,根据前两个分配策略的算法消费者和topic分区的关系要发生很多变动。
黏性分配策略是尽可能的每一次分配变更相对上一次分配做最少的变动,尽可能保证分区分配均衡(即分配给consumers的分区数最大相差为1),当发生分区重分配再均衡时,尽可能多的保留现有的分配结果
kafka的java consumer新版本中是由用户主线程和心跳线程的双线程设计。用户主线程就是业务代码中消费者启动的那个线程,而心跳线程值服务定期发送心跳给broker保证自己是存活的。因为心跳线程只是保证自己的存活所以实际消息的处理还是主线程完成的也可以理解为是个单线程设计。
兼容其他语言、设计简单、单线程+轮询的机制,这种设计能够较好的实现非阻塞式的消息获取
要注意catch吃掉异常,因为是单线程设计所以报错就终止了。
消费者里面写多线程是非线程安全的。
rocketmq rabbitmq 都是多线程的不需要额外处理。
kafka本身没有重试队列以及死信队列,要完成这些功能需要自己写业务代码完成比如自己手动创建一个队列或者引用一些第三方jar包kafka本身没有这样的功能。spring框架集成kafka有此类功能。
比如消费者开启offset自动提交,consumer默认5s提交一次offset,不过2s的时候消费者了,宕机的这时候我们消费者实际消费了n条消息,不过broker的offset还没有更新,重启消费者后还是从broker的offset的指针处开始消费,这样就出现了消息重复的问题。每次增加消费者服务器或者消费者服务器宕机消费策略再均衡的时候也会出现重复消费的问题。
比如消费者开启offset手动提交,当offset被提交时数据还在内存中未处理,比如有8个消息消费一半就给提交了,broker的offset更新为8,不过这时候消费者宕机了就丢失了一半的消息。
消费者的消费速率小于生产者的生产速率。
消费能力不足的时候应该考虑增加topic的partition,同时增加消费者数量,因为一个partition只能被消费者组的一个消费者消费所以应该同时增加partition和消费者数量。
可以提高批次拉取消息的数量配置fetch.max.bytes、max.poll.records
消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息,在kafka的broker中用offset标识,offset记录的是下一个消费位置。比如消费到9记录的是10。
分区位移是发送到partition时,这个消息的偏移量offset是多少。
消费位移是消费者消费到哪里,指向的是下一个消费消息位置的offset。
Kafka0.9版本前这个offset是保存在zookeeper的,这个offset要频繁的写zk不太适合频繁的写更新,这样会拖慢zk集群的性能
kafka0.9后offset是保存在__consumer_offsets主题中。
enable.auto.commit参数配置自动提交或者手动提交,kafka消费者端的设计是单线程的没有额外的线程来处理提交偏移量这个操作。
默认每5s提交一次上次处理的偏移量。poll的时候会检测是否达到5s,如果达到5s会提交目前消费者处理的最大偏移量,比如消费了0到100直接提交的offset就是101,broker的__consumer_offsets收到101消费者指针就指向101。
即使当前位移主题没有消息可以消费了,位移主题中还是会不停地写入最新位移的消息使用日志压缩可以节省我们的磁盘空间。
生产环境中如果出现过位移主题无限膨胀占用过多磁盘空间的问题,建议去检查一下 Log Cleaner 线程的状态是不是挂掉了。
需要设计enable.auto.commit = false来配置消费者手动提交。可以指定主题分区偏移量一个个提交
提交之后消费者服务器宕机会消息丢失
提交之前消费者服务器宕机会消息重复
commitSync()方法会阻塞,tps不高,必须等待broker服务器响应。
commitAsync()tps高,不需要等待服务器结果。不过如果第一次提交的offset是20 第二次提交的offset是100 ,第一次失败,第二次成功了了会从20还是重新消费会有重复消费的问题。
先异步指定位移提交最后finally中同步位移提交
可以新建个jvm退出前的守护线程hook调用wake up方法使kafka消费者while(true) 的 poll 也结束在finally中调用同步位移提交方法提交偏移量。
消费者数量或者分区数量因为某些原因发生变化,消费者和分区的绑定关系会根据消费者分配策略进行再平衡重新分配就是kafka的分区再均衡。 partition被重新分配给一个消费者时,消费者当前的读取状态会丢失。
消费者订阅主题的时候传入再均衡监听器,再均衡监听器实现ConsumerRebalanceListener接口
public void onPartitionRevoked(Collection partitions)
可以在这个方法中代码实现同步提交偏移量
public void onPartitionAssigned(Collection partitions)
kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量,一个分片并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是 <topic_name>-<partition_id> ,例如 test-0
topic – 多个partition – log – 多个segment – (.log、 .index 、 .timeindex)
kafka数据存放位置是在config/server.properties中的log.dirs配置。
为了方便管理partition的数据,将一个partition拆分成多个大小相等的数据段也就是segment,每个segment文件大小相等消息数量不一定相同
偏移量索引文件
时间戳索引文件
日志文件,存储生产者生产的数据
快照文件,记录事务信息
保存每一个leader开始写入消息时候的offset,follwer被选为leader时会根据这个确认哪些消息可用
为消息数据创建了2种索引,一种是方便时间查找的稀疏索引.timestamp,一种是方便offset查找的.稀疏索引.index
offset是逻辑偏移量,position是物理偏移量。segment标志了大体的偏移量比如1000.log 0 - 1000,.index标志了再具体一些的偏移量比如 100、200、300…,根据稀疏索引找到offset的位置然后找到磁盘对应的地址也就是position的值之后去顺序读。
kafka提供了2种日志清理策略,日志删除和日志压缩
kafka采用了批量发送的方式,通过将多条消息按照分区进行分组,然后每次发送一个消息集合从而减少网络传输的开销。
数据量越大,压缩效果才越好
通常有gzip snappy lz4 等压缩方式
比如protobuf、avro来减少实际网络传输量以及磁盘存储量,提高吞吐量。
磁盘顺序写的性能远远高于磁盘随机写,甚至高于内存随机写。
Kafka 用到了零拷贝(Zero-Copy)技术来提升性能,所谓的零拷贝是指数据直接从磁盘文件复制到网卡设备,而无需经过应用程序,减少了内核和用户模式之间的上下文切换。
为了减少用户态和内核态的过多交互就有了DMA技术,他是零拷贝的基石,通过DMA硬件可以绕过CPU自己去直接访问系统主内存