在进行消费者正常消费过程中以及Rebalance操作开始之前,都会提交一次offset记录Consumer当前的消费位置。提交offset的功能也是由ConsumerCoordinator实现的。
先来了解OffsetCommitRequest和OffsetCommitResponse的消息体格式,如图所示。
OffsetCommitRequest中各个字段的含义如表所示。
OffsetCommitResponse中各个字段的含义如表所示。
图展示了ConsumerCoordinator中与提交offset相关的四个方法以及它们之间的调用关系。
在SubscriptionState中使用TopicPartitionState记录了每个TopicPartition的消费状况,TopicPartitionState.position字段则记录了消费者下次要从服务端获取的消息的offset。
当没有明确指定待提交的offset值时,则将TopicPartitionState.position作为待提交offset,组织成集合,形成ConsumerCoordinator.commitOffset*()方法的第一个参数。
commitOffsetsSync()方法与commitOffsetsAsync()方法的实现类似,也是调用sendOffsetCommitRequest()方法创建并缓存OffsetCommitRequest,使用OffsetCommitResponseHandler处理OffsetCommitResponse。
但是有两点不同:
一是commitOffsetsSync()方法在发送OffsetCommitRequest时使用了ConsumerCoordinator.poll(future)阻塞等待OffsetCommitResponse处理完成,这样才实现了同步提交的功能;
二是commitOffsetsSync()方法在检测到RetriableException异常时会进行重试。commitOffsetsSync()方法的具体代码就不贴出来了。maybeAutoCommitOffsetsSync()方法会根据enable.auto.commit配置项的值决定是否调用commitOffsetsAsync()方法。
AutoCommitTask是一个定时任务,它周期性地调用commitOffsetsAsync()方法,实现了自动提交offset的功能。开启自动提交offset功能后,业务逻辑中就可以不用手动调用commitOffsets*()方法提交offset了。AutoCommitTask的代码比较简单。
OffsetCommitResponseHandler.handle方法是处理OffsetCommitResponse的入口。
在Rebalance操作结束之后,每个消费者都确定了其需要消费的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。假设之前已经将最后的消费位置提交到了GroupCoordinator,GropeCoordinator将其保存到了Kafka内部的Offsets Topic中,此时消费者可以通过OffsetFetchRequest请求获取上次提交offset并从此处继续消费。
refreshCommittedOffsetsIfNeeded方法的主要功能是发送OffsetFetchRequest请求,从服务端拉取最近提交的offset集合,并更新到Subscriptions集合中。
Fetcher类的主要功能是发送FetchRequest请求,获取指定的消息集合,处理FetchResponse,更新消费位置。图是Fetcher类依赖的组件。
先来了解Fetcher的核心字段。
records中第一个消息的offset,partition记录了消息对应的TopicPartition。
Fetcher的核心方法可以分为三类:fetch消息的相关方法,用于从Kafka获取消息;更新offset相关的方法,用于更新TopicPartitionState中的position字段;获取Metadata信息的方法,用于获取指定Topic的元信息。
首先来了解FetchRequest和FetchResponse的消息体的格式,如图所示。
FetchRequest中的字段如表所示。
FetchResponse中的字段如表所示。
createFetchRequests()方法负责创建FetchRequest请求,其返回值是Map<Node,FetchRequest>类型,key是Node,value是发往对应Node的FetchRequest集合,其核心逻辑如下:
sendFetches方法的主要功能是将FetchRequest添加到unsent集合中等待发送,并注册FetchResponse处理函数。
FetchResponse的处理主要是解析FetchResponse后按照TopicPartition分类,将获取到的消息数据(未解析的byte数组)和offset组装成CompletedFetch对象并添加到completedFetches。
存储在completedFetches队列中的消息数据还是未解析的FetchResponse.PartitionData对象。
在fetchedRecords方法中会将CompletedFetch中的消息数据进行解析,得到Record集合并返回,同时还会修改对应TopicPartitionState的position,为下次fetch操作做好准备。
在有些场景下,例如第一次消费某个Topic的分区,服务端的内部Offsets Topic中并没有记录当前消费者在此分区上的消费位置,所以消费者无法从服务端获取最近提交的offset。
此时如果用户手动指定消费的起始offset,则可以从指定offset开始消费,否则就需要重置TopicPartitionState.position字段。
重置TopicPartitionState.position字段的过程中涉及OffsetsRequest和OffsetsResponse,先来介绍其格式,如图所示。
在OffsetsRequest中需要说明的字段是timestamp,取值为-1或-2,分别表示LATEST、EARLIEST两种重置策略。
在OffsetsResponse中需要说明的字段是offsets,它是服务端返回的offset集合。
Fetcher.updateFetchPositions方法中实现了重置TopicPartitionState.position字段的功能,其具体逻辑如下:
在Fetcher中还提供了获取Metadata信息的相关方法。涉及sendMetadataRequest、getTopicMetadata、getAllTopicMetadata三个方法,其调用关系如图所示。
基本逻辑是发送MetadataRequest请求到负载最小的Node节点,并阻塞等待MetadataResponse,正常收到响应后对其解析,得到需要的集群元数据。
需要注意的是,Fetcher提供的这三个获取集群元数据的相关方法并不会更新Fetcher.metadata字段中保存的集群元数据。