????????在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker 地址清单。例如 node-1:9092, node-2:9092, node-3:9092,可以设置1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker 里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator 缓冲区总大小, 默认 32m 。 |
batch.size | 缓冲区一批数据最大值,默认 16k 。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size sender 等待 linger.time 之后就会发送数据。单位 ms 默认值是 0ms ,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间 。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据 Leader 收到数据后应答。 -1 all ):生产者发送过来的数据 Leader 和 isr 队列里面的所有节点收齐数据后应答。 默认值是 -1,-1 和all 是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回ack 的次数, 默认为 5 ,开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数 。 默认是 int 最大值, 2147483647 。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时 候,其他的消息可能发送成功了 。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
enable.idempotence | 是否开启幂等性 默认 true ,开启 幂等性 。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none ,也就是不压缩。支持压缩类型 none 、 gzip 、 snappy 、 lz4 和 zstd 。 |
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1 、创建kafka生产者对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node-1:9092,node-2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 使用自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 2、发送数据
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "wei");
producer.send(producerRecord);
// 3、关闭资源
producer.close();
}
?????????回调函数会在producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息( RecordMetadata 和 异常信息( Exception )),如果 Exception 为 null ,说明消息发送成功,如果 Exception 不为 null ,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1 、创建kafka生产者对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node-1:9092,node-2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 使用自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 2、向指定分区发送数
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "wei");
// 添加callback函数,接收返回值,返回值封装在meta中
Future<RecordMetadata> send = producer.send(producerRecord, (meta, ex) ->{
System.out.println(meta.topic());
System.out.println(meta.partition() + "-----------------------");
});
// 3、关闭资源
producer.close();
}
?????????全部发送成功后再发送下一批数据。
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1 、创建kafka生产者对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node-1:9092,node-2:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 使用自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 2、向指定分区发送数
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "wei");
// 添加callback函数,接收返回值,返回值封装在meta中
producer.send(producerRecord).get();
// 3、关闭资源
producer.close();
}
分区策略有以下三种情况(针对 ProducerRecord 不同的构造方法):
1)、指明 partition 值,直接将数据发送到指定分区;
2)、没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
3)、既没有 partition 值又没有 key 的情况下,Kafka 采用Sticky Partiion (黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
????????自定义分区,实现Partitioner 接口。
public class CustomPartition implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// 将消息中包含 long 的发送到分区1,其余的发送到分区 0
String msg = o1.toString();
if (msg.contains("long")){
return 1;
}
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
????????在生产者配置中添加指定分区策略。
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
? ? ? ? 本文介绍kafka生产者的参数配置以及使用方法,?快速上手kafka生产者的使用。
????????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)