Consumer源码解读

发布时间:2024年01月22日

Consumer源码解读

本课程的核心技术点如下:

1、consumer初始化
2、如何选举Consumer Leader
3、Consumer Leader是如何制定分区方案

4、Consumer如何拉取数据
5、Consumer的自动偏移量提交

Consumer初始化

image.png

从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法

image.png

这个方法的前面代码部分都是一些配置,我们分析源码要抓核心,我把核心代码给摘出来

NetworkClient

Consumer与Broker的核心通讯组件

image.png

ConsumerCoordinator

协调器,在Kafka消费中是组消费,协调器在具体进行消费之前要做很多的组织协调工作。

image.png

Fetcher

提取器,因为Kafka消费是拉数据的,所以这个Fetcher就是拉取数据的核心类

image.png

而在这个核心类中,我们发现有很多很多的参数设置,这些就跟我们平时进行消费的时候配置有关系了,这里我们挑一些核心重点参数来讲一讲

fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

fetch.max.bytes

每次fetch请求时,server应该返回的最大字节数。这个参数决定了可以成功消费到的最大数据。

比如这个参数设置的是50M,那么consumer能成功消费50M以下的数据,但是最终会卡在消费大于50M的数据上无限重试。fetch.max.bytes一定要设置到大于等于最大单条数据的大小才行。

默认是50M

image.png

fetch.wait.max.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

这里说一下参数的默认值如何去找:

image.png

image.png

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。

假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

备注:1、Kafka入门笔记

image.png

max.poll.records

控制每次poll方法返回的最大记录数量。

默认是500

image.png

如何选举Consumer Leader

回顾之前的内容

image.png

那么如何完成以上的逻辑的,我们跟踪代码:

image.png

1、消费者协调器与组协调器的通讯

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

对Broker的响应进行处理

image.png

image.png

1、消费者协调器发起入组请求

image.png

image.png

image.png

image.png

image.png

Consumer Leader如何制定分区方案

回顾之前的内容

image.png

消费者分区策略

消费者参数

partition.assignment.strategy

分区分配给消费者的策略。默认为Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

image.png

StickyAssignor

初始分区和RoundRobin是一样

粘性分区:每一次分配变更相对上一次分配做最少的变动.

目标:

1、分区的分配尽量的均衡

2、每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标

比如有3个消费者(C0、C1、C2)、4个topic(T0、T1、T2、T34),每个topic有2个分区(P1、P2)

image.png

C0: T0P0、T1P1、T3P0

C1: T0P1、T2P0、T3P1

C2: T1P0、T2P1

如果C1下线 、如果按照RoundRobin

image.png

C0: T0P0、T1P0、T2P0、T3P0

C2: T0P1、T1P1、T2P1、T3P1

对比之前

image.png

如果C1下线 、如果按照StickyAssignor

image.png

C0: T0P0、T1P1、T2P0、T3P0

C2: T0P1、T1P0、T2P1、T3P1

对比之前

image.png

image.png

自定义策略

extends 类AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,类.class.getName());

即可。

消费者分区策略源码分析

接着上个章节的代码。

image.png

image.png

image.png

image.png

image.png

Consumer拉取数据

这里就是拉取数据,核心Fetch类

image.png

image.png

image.png

自动提交偏移量

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

当然,自动提交auto.commit.interval.ms

image.png

默认5s

image.png

从源码上也可以看出

maybeAutoCommitOffsetsAsync 最后这个就是poll的时候会自动提交,而且没到auto.commit.interval.ms间隔时间也不会提交,如果没到下次自动提交的时间也不会提交。

这个autoCommitIntervalMs就是auto.commit.interval.ms设置的

文章来源:https://blog.csdn.net/qq_44981526/article/details/135745775
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。