通过前面的分析我们知道,主线程通过KafkaProducer.send方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络I/O操作是由Sender线程统一进行的。
我们先来了解一下Sender线程发送消息的整个流程:
首先,它根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息,即上一节介绍的RecordAccumulatorready方法;
然后,根据生产者与各个节点的连接情况由NetworkClient管理,过滤Node节点;
之后,生成相应的请求,这里要特别注意的是,每个Node节点只生成一个请求;
最后,调用NetWorkClient将请求发送出去。图展示了Sender依赖的三个比较关键的组件。
Sender实现了Runnable接口,并运行在单独的ioThread中。Sender的run方法调用了其重载run(long),这才是Sender线程的核心方法,也是发送消息的关键流程,其时序图如图所示。
下面简述run(long)方法的流程:
在Protocol类中罗列了全部请求和响应的格式,请求和响应有多个不同的版本。
首先,介绍生产者向服务端追加消息时使用的请求和响应,它们分别是ProduceRequest(Version:2)和Produce Response(Version:2),结构如图所示。
Produce Request(Version:2)的请求头和请求体各个字段的含义如表所示.
Produce Response(Version:2)各个字段与含义如表所示.
Sender.sendProduceRequests()方法的功能是将待发送的消息封装成ClientRequest。
不管一个Node对应有多少个RecordBatch,也不管这些RecordBatch是发给几个分区的,每个Node至多生成一个ClientRequest对象。创建ClientRequest的核心逻辑如下:
下面来看Sender.sendProduceRequests()方法的具体实现:
在介绍NetworkClient之前,我们先来了解NetworkClient的整个结构,以及其依赖的他组件,如图所示。
需要注意的是,图中的Selector的类型并不是java.nio.channels.Selector,而是
org.apache.kafka.common.network.Selector,为了方便区分和描述,将其简称为KSelect。
KSelector使用NIO异步非阻塞模式实现网络I/O操作,KSelector使用一个单独的线程可以管理多条网络连接上的连接、读、写等操作。
下面介绍KSelector的核心字段和方法,如图所示。
下面先介绍KSelector的字段。
下面介绍KSelector的核心方法。KSelector.connect方法主要负责创建KafkaChannel,并添加到channels集合中保存。其代码如下:
KSelector.send方法是将之前创建的RequestSend对象缓存到KafkaChannel的send字段中,并开始关注此连接的OP_WRITE事件,并没有发生网络I/O。
在下次调用KSelector.poll时,才会将RequestSend对象发送出去。如果此KafkaChannel的send字段上还保存着一个未完全发送成功的RequestSend请求,为防止覆盖数据,则会抛出异常。也就是说,每个KafkaChannel一次poll过程中只能发送一个Send请求。
KSelectorpoll方法真正执行网络VO的地方,它会调用nioSelector.select方法等待VO事件发生。
当Channel可写时,发送KafkaChannel.send字段(切记,一次最多只发送一个RequestSend,有时候一个RequestSend也发送不完,需要多次poll才能发送完成);
Channel可读时,读取数据到KafkaChannel.receive,读取一个完整的NetworkReceive后,会将其缓存到stagedReceives中,当一次pollSelectionKeys完成后会将stagedReceives中的数据转移到completedReceives。
最后调用maybeCloseOldestConnection方法,根据IruConnections记录和connectionsMaxldleNanos最大空闲时间,关闭长期空闲的连接。
下面是KSelector.poll方法的代码:
KSelector.pollSelectionKeys()方法是处理I/O操作的核心方法,其中会分别处理OP_CONNECT、OP_READ、OP_WRITE事件,并且会检测连接状态。下面是其代码:
最终,读写操作还是交给了KafkaChannel,下面来分析其相关的方法:
InFlightRequests队列的主要作用是缓存了已经发出去但没收到响应的ClientRequest.其底层是通过一个Map<String,Deque>对象实现的,key是Nodeld,value是发送到对应Node的ClientRequest对象集合。
InFlightRequests提供了很多管理这个缓存队列的方法,还通过配置参数,限制了每个连接最多缓存的ClientRequest个数。
InFlightRequests的结构如图所示。
InFlightRequests.canSendMore()方法比较重要,NetworkClient调用此方法是用于判断是否可以向指定Node发送请求的条件之一,其代码如下:
此外,队头的消息与对应KafkaChannel.send字段指向的是同一个消息,为了避免未发送的消息被覆盖,也不能让KafkaChannel.send字段指向新请求。最后queue.size<this.maxInFlightRequestsPerConnection)条件则是为了判断InFlightRequests队列中是否堆积过多请求。如果Node已经堆积了很多未响应的请求,说明这个节点负载可能较大或是网络连接有问题,继续向其发送请求,则可能导致请求超时。
MetadataUpdater接口是一个辅助NetworkClient更新的Metadata的接口,它有两个实现类,如图所示。
ManualMetadataUpdater是个空实现,DefaultMetadataUpdater是NetworkClient使用的默认实现,下面介绍其三个字段。
MetadataRequest请求发送之前,要将metadataFetchInProgress置为true,然后从所有Node中选择负载最小的Node节点,向其发送更新请求。
这里的负载大小是通过每个Node在InFlightRequests队列中未确认的请求决定的,未确认请求越多则认为负载越大。
剩余的步骤与普通请求的发送方式一样,先将请求添加到InFlightRequests队列中,然后设置到KafkaChannel的send字段中,通过KSelector.poll方法将MetadataRequest请求发送出去。下面是DefaultMetadataUpdater.maybeUpdate()方法的具体代码:
在收到MetadataResponse之后,会先调用MetaUpdater.handleSuccessfulResponse方法检测是否为MetadataResponse,如果是,则调用handleResponse()解析响应,并构造Cluster对象更新Metadata.cluster字段。
注意,Cluster是不可变对象,所以更新集群元数据的方式是:创建新的Cluster对象,并覆盖Metadata.cluster字段。具体代码如下:
NetworkClient中所有连接的状态由ClusterConnectionStates管理,它底层使用Map<String,NodeConnectionState>实现,key是Nodeld,value是NodeConnectionState对象,其中使用ConnectionState枚举表示连接状态,还记录了最近一次尝试连接的时间戳。
前面已经介绍完了NetworkClient依赖的组件,下面来看一下NetworkClient的实现。NetworkClient是一个通用的网络客户端实现,不只用于生产者发送消息,也可以用于消费者消费消息以及服务端Broker之间的通信。
下面介绍NetworkClient的核心方法。NetworkClient.ready方法用来检查Node是否准备好接收数据。首先通过NetworkClientisReady方法检查是否可以向一个Node发送请求,需要符合以下三个条件,则表示Node已准备好:
NetworkClient.initiateConnect方法会修改在ClusterConnectionStates中的连接状态,并调用Selectorconnect()方法发起连接。
之后调用Selector.pollSelectionKeys()方法时,判断连接是否建立。如果建立成功,则会将ConnectionState设置为CONNECTED。
NetworkClient.send方法主要是将请求设置到KafkaChannel.send字段,同时将请求添加到InFlightRequests队列中等待响应。
NetworkClient.poll()方法调用KSelector.poll进行网络I/O(参考KSelector小节),并使用handle*()方法对KSelector.poll产生的各种数据和队列进行处理。
下面来看一下handle*()方法的处理逻辑:
经过一系列handle*()方法处理后,NetworkClient.poll()方法中产生的全部ClientResponse已经被收集到responses列表中。
之后,遍历responses调用每个ClientRequest中记录的回调,如果是异常响应则请求重发,如果是正常响应则调用每个消息的自定义Callback。
在前面的createProduceRequests方法中提到过,这里调用的Callback回调对象,也就是RequestCompletionHandler对象,其onComplete方法最终调用Sender.handleProduceResponse()方法,其逻辑如下: