Producer(生产者): 生产并发送消息到Broker(推送)
// 0.9之后的版本是基于Java实现(之前是Scala实现)
Producer客户端发送消息大致逻辑:
构造Producer必填的3个参数:
参数 | 说明 |
---|---|
bootstrap.servers | 引导程序的服务地址 格式: 地址1:端口1,地址N:端口N (建议指定两个以上的Broker地址以保证稳定性, 且使用主机名形式) |
key.serializer | 发送时对Key调用的序列化器 Broker仅能接受字节数组形式的消息 byte[] |
value.serializer | 发送时对Value调用的序列化器 Broker仅能接受字节数组形式的消息 byte[] |
// 序列化器必须以全限定名方式指定, Java的ProducerConfig类中包含所有的配置参数
ProducerRecord(构建消息): Producer每次发送的消息体
ProducerRecord定义:
public class ProducerRecord<K, V> {
private final String topic; // Topic(必填)
private final Integer partition; // Partition
// 消息头部(0.11版本引入)
// 指定与应用相关信息(可忽略)
private final Headers headers;
// 键(附加信息)
// 其会用于计算Partition(二次归类)
private final K key;
// 值(消息体, 必填)
// 为空则代表: 墓碑消息
private final V value;
// 消息时间戳
// 细分为CreateTime(消息创建时间)和LogAppendTime(追加日志时间)
private final Long timestamp;
......
}
Send(发送消息): Producer构建ProducerRecord之后发送给Broker
Send()方法的定义:
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
RecordMetadata
对象)// 不可重试异常发生时会直接抛出并结束
常见的重试异常为:
可重试异常 | 说明 |
---|---|
NewworkException | 网络异常 |
LeaderNotAvailableException | 副本的leader不可用 (可能正在选举leader) |
UnknownTopicOrPartitionException | Topic或Partition异常 |
NotEnoughReplicasException | 副本数量不足 |
NotCoordinatorException | 协调器异常 |
Send()方法中的Callback定义:
public interface Callback {
void onCompletion(RecordMetadata var1, Exception var2);
}
Close(结束发送):回收Producer实例
Close()方法的定义:
public void close();
public void close(long timeout, TimeUnit timeUnit);
Producer的发送消息由两个线程完成:
如: Producer发送消息链路图
// Interceptor和Partitioner可选择性处理, 但必须经Serializer处理
Producer发送ProducerRecord的流程:
<分区, <Deque<ProducerBatch>>
形式变为<Node, List<ProducerBatch>>
<Node, Request>
形式Map<nodeId, Deque<Request>>
缓存Request// 形式转换是为完成应用逻辑层到网络I/O层的转换
RecordAccumulator内存复用原则:
java.io.ByteBuffer
和BufferPool
实现内存复用// BufferPool可避免频繁的申请和释放内存
InFlightRequest中包含leastLoadedNode
// 元数据: Broker、Topic、Partition、leader和follower副本所在的Broker等
如: Sender线程维护leatLoadedNode信息
ProducerInterceptor(拦截器): 消息发送前/后的进行的操作
interceptor.classes
参数指定Producer所使用的ProducerInterceptorProducerInterceptor定义:
public interface ProducerInterceptor<K, V> extends Configurable {
// 发送前进行的操作
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
// 发送后被应答之后或失败进行的操作
// 优先于Send()方法中定义的Callback前执行
// 由于该方法运行于Producer的IO线程中, 应简洁
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
// 关闭拦截器
public void close();
}
// 抛出的任何异常都会被记录到日志中, 并不再向上抛
Serializer(序列化器): 将特定数据转换成字节数组(byte[]
)
Serializer定义:
public interface Serializer<T> extends Closeable {
// 配置序列化器
// 常用于指定编码类型(默认UTF-8)
void configure(Map<String, ?> configs, boolean isKey);
// 执行序列化
byte[] serialize(String topic, T data);
// 关闭序列化器
// 需保证幂等性
void close();
}
// 不建议使用自定义Serializer或DeSerializer, 会增加耦合度
Partitioner(分区器): ProducerRecord分区的默认规则
partitioner.class
参数指定Producer所使用的PartitionerPartitioner定义:
public interface Partitioner extends Configurable, Closeable {
// 计算并返回分区号
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
// 关闭分区器
public void close();
}
public interface Configurable {
// 获取配置信息并初始化数据
void configure(Map<String, ?> configs);
}
默认的Partitioner: org.apache.kafka.clients.producer.internals.DefaultPartitioner
close()
方法默认为空// 消息相同的情况下会写入相同的分区(存在消息互相覆盖的情况)
事务(Transaction): Producer操作的最小原子单位(可跨Partition)
enable.idempotence
)Producer中常用的事务方法:
// 初始化事务
void initTransactions();
// 开启事务
void beginTransaction();
// 事务内的位移提交
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
// 提交事务
void commitTransaction();
// 终止事务(回滚)
void abortTransaction();
事务协调器(TransactionCoordinator): 负责事务中的各类操作
__transaction_state
如: 事务的执行流程
InitProducerIdRequest
请求获取该事务IDConsumer的事务受以下限制: