目录
????????Kafka中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
????????topic 只是逻辑上的概念,而 partition 是物理上的概念。每个partition对应于一个log文件,该 log 文件中存储的就是 producer 生产的数据。Producer生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
????????由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个 partition 分为多个 segment 。每个 segment 对应两个文件 ——“.index”文件和“.log”文件(一个存储当前文件的索引范围,一个存储真正的数据)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,like?这个 topic 有三个分区,则其对应的文件夹为 like-0,like-1,like-2:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
这种存储规则很像我们 Spark Shuffle 阶段产生的文件的存储规则,顺便回忆一下 Spark 的 Shuffle 阶段:
????????Shuffle 过程中每个 Map 任务会产生两个文件,即数据文件和索引文件。其中,数据文件是存储当前 Map 任务的输出结果,而索引文件中则存储了数据文件中的数据的分区信息。下一个阶段的 Reduce 任务就是根据索引文件来获取属于自己处理的那个分区的数据。
? ? ? ? 其实 Spark 常见的有两种 Shuffle 策略:HashShuffle 和 SortShuffle ,但是这属于 Spark 调优的内容了,现在不必深究。
????????我们知道,Kafka 的三大组成部分:Producer、Broker、Consumer。接下来我们要学习的部分就是 Kafka 的生产者是如何把数据发送给 Kafka 集群(Broker)的。
????????在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka?Broker。
main 线程:
sender线程:
ACK=0:生产者在消息发送后不会等待来自服务器的任何确认。这意味着生产者无法知道消息是否成功存储在Kafka集群中。这个级别提供了最高的吞吐量,但在可靠性方面是最低的,因为可能会丢失消息。
ACK=1:生产者会等待直到消息的领导者副本(Leader Replica)确认接收到消息。一旦领导者副本存储了消息,生产者会收到一个确认。这个级别在性能和数据可靠性之间提供了一个平衡。但如果领导者副本在确认后发生故障,而消息还未复制到追随者副本(Follower Replicas),则消息可能会丢失。
ACK=all或ACK=-1(默认级别):生产者会等待消息被所有的同步副本(ISR,In-Sync Replicas)确认。这意味着只有当所有的同步副本都已经接收并存储了消息,生产者才会收到一个确认。这个级别提供了最高的数据可靠性,但可能会牺牲一些性能,因为需要等待所有副本的确认。
如果数据发送成功,就把 InFlighRequests 队列中的请求缓存给清除掉,并且把对应 RecordAccuulator 中的数据清除掉。
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。 |
key.serializer和value.serializer | 指定发送消息的key和value的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator缓冲区总大小,默认32m。 |
batch.size | 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。 |
0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。 | |
max.in.flight.requests.per.connection | 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。 |
当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 | |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
是否开启幂等性,默认true,开启幂等性。 | |
compression.type | 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。 |
send 方法有两种传参方式,一种有回调函数,一种不带回调函数。?
????????所谓的异步发送指的是外部的数据使用异步的方式吧数据发送到 RecordAccumulator 的内存队列当中去,异步的体现就是外部数据发送到队列中后,并不会等待?RecordAccumulator 把数据传给 Broker 节点并返回成功的消息才继续发送;而是直接把数据扔到?RecordAccumulator 的内存队列后就撒手不管了。
导入依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
注意:使用 kafka 前必须启动 zookeeper,不然报错无法使用。
public class CustomProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 指定对应的 key 和 value 的序列化类型 key.serialize
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
// 这两个是等价的
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1. 创建 Kafka 生产者对象
// 需要指定键值的类型
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2. 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("like","test"+i));
}
// 3. 关闭资源
kafkaProducer.close();
}
}
我们可以看到,一条消息就是一个 ProducerRecord 对象,不管你的消息多么短,哪怕是一个标点,它也会被包装进一个对象里面去。?
运行结果:?
????????我们发现,我们的编程步骤和我们上面Kafka的发送原理中生产者的发送步骤是一致的,只不过我们这里没有设置拦截器和分区器,这是一个最简单的 Kafka 生产者程序了。
现在我们来使用带回调函数的异步发送:
只需要修改send方法即可:
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){ // 如果异常为空 说明正常执行
System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
}
}
});
}
运行结果:?
这种带回调函数的方法可以让我们知道数据更多的元数据信息(比如主题、分区...)。
所谓的同步发送,就是外部数据发送到?RecordAccumulator 的内存队列后,必须等待数据被 Selector 发送到 Broker 并返回响应信息才能继续发送。代码的实现很简单,只需要在send方法之后加一个 get() 即可:
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("like","test"+i)).get();
}
分区的好处我们太清楚了,之前的 Hadoop、Spark、Flink 都有分区的概念,在 Shuffle 的时候、在 keyBy 的时候,分区的好处显然可以增加并行度,提高我们数据的处理效率;可以负载均衡,不会出现服务器涝的涝死,旱的旱死。
所以,从大数据的存储和计算的角度来看,分区有这么两种好处:
我们可以看到,在初始化 ProducerRecord 时,有 6 种初始化的传参方式:
接下来我们就利用上面的回调函数可以返回数据的元数据信息,来测试一下是不是像它说的这样:
1、指定分区:?
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("like", 0,"","test" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){ // 如果异常为空 说明正常执行
System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
}
}
});
}
运行结果:
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
面试题:如何把一张 MySQL 表的数据都放到一个 Kafka 的分区当中去?
答:生产者在发送数据时指定数据的分区为 表名?。
2、不指定分区,只指定 key:
kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){ // 如果异常为空 说明正常执行
System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
}
}
});
?我们指定 key 为 i (i 的值为0,1,2,3,4),运行结果:
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1
3,不指定 partition?也不指定 key。这种方式其实我们上面在学异步发送的时候已经演示过了,它默认会等这一批都满了(16K)或者达到 linger.ms(默认0ms)才会发送。
我们这里在循环的时候,让线程睡眠 2ms,这样就不会因为默认的 linger.ms 太短立即发送,导致数据量太小,发送太快看不出来粘性分区的特点:
for (int i = 0; i < 25; i++) {
kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){ // 如果异常为空 说明正常执行
System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
}
}
});
Thread.sleep(2);
}
运行结果:?
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 1
我们可以看到,粘性分区是这样的:它每次的分区和上一次的分区是不一样的,并且每次的数据尽可能会放到一个分区去。?
在工作当中,一些特殊场景我们现有的分区策略是无法实现的,这就需要我们自定义来实现分区器了。
? ? ? ? 实现一个分区器,发送过来的数据如果包含 "大傻春" 就发往 0 号分区,否则发往 1 号分区。
1. 实现 Partitioner 接口
2. 重写 partition() 方法
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String val = value.toString();
if (val.contains("大傻春"))
return 0;
return 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
使用自定义分区器:
// 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 发送数据
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record;
if (i==3)
record = new ProducerRecord<>("like", "" + i, "大傻春");
else
record = new ProducerRecord<>("like", ""+i,"test" + i);
kafkaProducer.send(record, (RecordMetadata recordMetadata, Exception e)-> {
if (e == null){ // 如果异常为空 说明正常执行
System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());
}
});
}
????????我们外部的数据是放到?RecordAccumulator 的内存队列当中等待发送的,队列中每份数据的大小(batch.size)默认是16K,但我们知道,如果数据迟迟不能达到 batch.size 的话,会根据默认的配置?linger.ms (默认是 0ms)来发送。但事实上,0ms就意味着不需要数据达到 batch.size 也就是 batch.size 这一配置形同虚设不起作用,也就是只要它察觉到有数据就立即发送,这样的效率其实并不高。
? ? ? ? 为了提高我们的吞吐量,我们当然需要调整这两个配置的参数大小:
????????当然这个参数的值我们需要慎重考虑,就像我们 Flink 当中水位线允许迟到的时间一样,不能说为了保证数据的迟到率最低,就把等待时间设置为几秒,那样 Flink 辛辛苦苦实现的毫秒级延迟有啥用呢。
public class CustomProducerParameters {
public static void main(String[] args) {
// 0. 配置信息
Properties properties = new Properties();
// 连接 kafka
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// key 和 value 的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// batch.size 单位: KB
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //默认none,可配置值gzip、snappy、lz4和zstd
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432*2); // 修改为64MB
// 1. 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 2. 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("like","value"+i));
}
// 3. 关闭资源
kafkaProducer.close();
}
}
? ? ? ? 数据的可靠性,指的其实就是当 Selector 把数据发送到 Broker 之后,是否等待响应(ack)之后再发送数据。
我们知道,ack 的值有三种:
?
????????在生产环境中,acks=0 很少用;acks=1:一般用于传输普通日志,允许丢个别数据;acks=-1:一般用于传输和钱有关的数据,用于对可靠性的要求比较高的场景。
完全可靠性在这种情况下尽管概率特别低,但是仍然不能排除,我们会在下面的数据去重去学习。
代码中配置 ACK 级别:?
// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
就像我们 Flink 中的时间语义,Kafka 生产者也有它的数据传递语义:
Kafka 0.11 版本后,引入了一个重要的特性:幂等性和事务。
????????重复数据的判断标准就是数据的三个属性必须唯一(PID、Partition、SeqNumber),其中 PID 是 Kafka 每次启动自动生成的,Partition是分区号,SeqNumber 是一个单调递增的数。
? ? ? ? 幂等性只能保证数据在单分区内不会重复。
配置幂等性
enable.idempotence 默认就是 true,flase 关闭。
我们知道,幂等性只能保证数据在单分区内不会重复,但是还是不能保证绝对的唯一,比如 Kafka 挂掉了需要重启,那么重启之后之前数据的?PID 就失效了,所以当有重复的数据时,并不能识别出来。这就需要事务的方式来解决了。
说明:开启事务必须开启幂等性。
其实,Kafka 实现精确一次的保证机制和我们 Flink 在保证端到端一致性时输出端的保证方式是很相似的(幂等写入和事务写入)
Kafka 的事务一共如下 5 个 API:
// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
测试:
public class CustomProducerTransaction {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 指定对应的 key 和 value 的序列化类型 key.serialize
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
// 这两个是等价的
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 指定事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");
// 1. 创建 Kafka 生产者对象
// 需要指定键值的类型
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
// 2. 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("like","test"+i));
}
kafkaProducer.commitTransaction();
}catch (Exception e){
kafkaProducer.abortTransaction(); // 终止事务
}finally {
// 3. 关闭资源
kafkaProducer.close();
}
}
}
注意:一定要记得手动指定事务id(保证唯一)。我们要把发送数据的代码写进 try-catch 中,如果有异常那么久终止事务。
????????数据有序的保证一直是流处理领域的一个问题,就像我们的 Flink 通过 水位线和Barrier 对齐算法保证数据容错和有序性。我们 Kafka 中也是一样的,单分区的话我们数据当然是有序的,但因为是多分区,所以分区和分区间的数据不能保证哪个数据先被读取,所以说分区间数据的顺序是无序的。
? ? ? ? 至于多分区要做到有序,可以把每个分区的数据在消费者这里进行排序,但是这样的效率不是很高,所以我们 Kafka 一般都是只保证单分区有序,个人认为,Kafka只要做到数据的尽量有序就可以了,反正Kafka的数据到下游传递时,一般也都是并行读取,比如 Flink 读取 Kafka 的数据就支持多个 Sink 算子,所以数据到了 Flink 的多个算子链里会出现乱序。
????????单分区内数据有序,但是不一定发送过去仍然是有序的,这就需要给它增加一些条件了,也就是接下来要学习的用幂等性解决数据乱序。
Kafka 1.x 版本之后保证数据单分区有序的条件:
解释:
????????为什么一定可以保证单分区内数据有序呢,因为幂等性就是(ProducerID,Partition,SeqNumber),其中 SeqNumber 要保证单调递增,对应我们的request,如果生产者的 RecoedAccumulator 中某个broker对应的请求队列中 request1和 request2 发送成功,但是request3 却失败了,那么当然会去重试,但是此时request4和request5也发送过去了怎么办。因为我们开启了幂等性,所以当 request1和request2 发送到Kafka的服务端(broker)之后,因为它们是不重复且有序的,所以会立即落盘,但是我们的 request3 由于发送失败,此时 request4和request5又发送过来了,但是由于 request4 和 request5 并不满足幂等性要求,所以不会落盘,而是留在内存当中,所以只有当 request3 再次到来之后,满足幂等性并落盘之后,request4和request5才能落盘。????????