Kafka 生产者是 Apache Kafka 中的一个组件,用于将数据发布到 Kafka 集群中的主题(topic)中。生产者负责将消息发送到 Kafka 集群,并且可以指定消息的键(key)和分区(partition)。生产者可以采用异步或同步的方式发送消息,并且可以配置消息的压缩、序列化和批处理等属性。
Kafka 生产者可以通过 Kafka 的 API 或者客户端库来实现,常见的客户端库包括 Java、Python、Go、C++ 等。生产者可以在分布式环境中部署,并且可以通过多个线程同时发送消息,以提高生产效率和吞吐量。
Kafka 生产者的主要作用是将数据快速、可靠地发送到 Kafka 集群中,以供消费者消费。生产者的高性能和可靠性是 Kafka 的关键特性之一,使得 Kafka 在大数据处理和实时数据流处理中得到广泛应用。
下图描述了生产者和broker之间的交互过程:
I will all comments for this diagram later …
这个参数是常用的KafkaProducer和KafkaConsumer用来连接Kafka集群的入口参数,这个参数对应的值通常是Kafka集群中部分broker的地址,比如:host1:9092,host2:9092,不同的broker地址之间用逗号隔开。这个参数使用的比较频繁,久而久之的就会认为这个参数配置的是所要连接的Kafka集群的broker地址,包括很多Kafka的初学者而言也会Keep这个观点,其实这个是不准确的。bootstrap.servers这个参数是用来配置发现Kafka集群信息的,这个意味着什么呢?
KafkaProducer与Kafka集群建立连接的过程是:
一个类名,用来序列化消息键为字节数组。Broker接收的键和值都是字节数组。
一个类名,用来序列化消息值为字节数组。
发出请求时要传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含逻辑应用程序名称,能够跟踪ip/端口以外的请求源。
此参数指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。允许以下设置:
acks=0。如果设置为零,则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,无法保证服务器已收到记录,重试配置也不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量将始终设置为-1。
acks=1。表示只要首领收到消息,并将记录成功写入其本地日志,就返回成功响应,不等待所有追随者的确认。在这种情况下,如果首领在确认成功后,追随者复制之前崩溃,则记录将会丢失。
acks=all。表示首领将等待同步复制集合中所有的副本都确认收到了记录。这保证了只要至少有一个同步复制副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于acks=-1的设置。
请注意,启用幂等性要求此配置值为“all”。如果设置了冲突的配置并且没有显式启用幂等性,则会禁用幂等性。
生产者可用于缓冲等待发送到服务器的记录的总内存字节数。如果记录的发送速度快于它们传递到服务器的速度,则生产者将阻止max.block.ms,之后将引发异常。
此设置应大致对应于生产者将使用的总内存,但不是硬绑定的,因为并非生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护飞行中的请求。
生产者生成的所有数据的压缩类型。默认值为none(即无压缩)。有效值为none、gzip、snappy、lz4或zstd。压缩是全批数据,因此批处理的效果也会影响压缩比(批处理越多,压缩效果越好)。
每当多个记录被发送到同一个分区时,生产者将尝试将记录批处理成更少的请求。这有助于提高客户端和服务器的性能。此配置控制以字节为单位的默认批处理大小。
不会尝试批处理大于此大小的记录。
发送到代理的请求将包含多个批,每个分区一个批,其中包含可发送的数据。
小批量会降低批处理的普遍性,并可能降低吞吐量(零批量会完全禁用批处理)。非常大的批处理大小可能会更加浪费内存,因为我们总是会分配指定批处理大小的缓冲区,以期待更多的记录。
注意:此设置提供要发送的批次大小的上限。如果我们为这个分区累积的字节少于这个数量,我们将“逗留”一段时间,等待更多记录出现。此linger.ms设置默认为0,这意味着即使累积的批量大小在此batch.size设置下,我们也会立即发送一条记录。
这个参数指定了生产者在收到服务器响应(阻塞)之前可以向单个连接发送多少个消息批次。
请注意,如果此配置设置为大于1并且enable.idempotence设置为false,则由于重试(即,如果启用了重试)而导致发送失败后,存在消息重新排序的风险;如果禁用重试或enable.idempotence设置为true,则将保留排序。此外,启用幂等性要求该配置的值小于或等于5。如果设置了冲突的配置并且没有显式启用幂等性,则会禁用幂等性。
请求的最大大小(以字节为单位)。此设置将限制生产者在单个请求中发送的记录批次数,以避免发送巨大的请求。这也是对最大未压缩记录批大小的有效限制。请注意,服务器对记录批大小有自己的上限(如果启用了压缩,则在压缩后),这可能与此不同。
TCP socket接收数据包缓冲区大小。如果值是-1,会使用操作系统默认值。
TCP socket发送数据包缓冲区大小。如果值是-1,会使用操作系统默认值。
当设置为“true”时,生产者将确保在流中只写入每条消息的一个副本。如果为“false”,则由于代理失败等原因导致的生产者重试可能会在流中写入重试消息的副本。请注意,启用幂等性要求max.in.flight.requests.per-connection小于或等于5(为任何允许的值保留消息顺序),重试次数大于0,acks必须为“all”。
如果没有设置冲突的配置,默认情况下会启用幂等。如果设置了冲突的配置并且没有显式启用幂等性,则会禁用幂等性。如果显式启用了幂等性并设置了冲突的配置,则抛出ConfigException。
确定在生成记录时将记录发送到哪个分区。可用选项包括:
当设置为“true”时,生产者不会使用记录键来选择分区。如果为“false”,则生产者将在存在键时根据密钥的哈希来选择分区。注意:如果使用自定义分区器,则此设置无效。
要用作拦截器的类的列表。通过实现org.apache.kafka.clients.producer.ProducerInterceptor接口,您可以在生产者接收到的记录发布到kafka集群之前拦截(并可能改变)这些记录。默认情况下,没有拦截器。
消息传递时间分布:
配置控制KafkaProducer的发送消息方法的阻塞时间:send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction()。对于send(),此超时限制了等待元数据获取和缓冲区分配的总时间(用户提供的序列化程序或分区程序中的阻塞不计入此超时)。对于partitionsFor(),此超时限制了在元数据不可用时等待元数据所花费的时间。与事务相关的方法总是阻塞,但如果无法发现事务协调器或在超时时间内没有响应,则可能会超时。
调用send()返回后报告成功或失败的时间上限。这限制了记录在发送之前延迟的总时间、等待来自代理的确认的时间(如果预期的话)以及允许重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用完,或者记录被添加到提前到达交货到期截止日期的批中,则生产者可能会报告未能在该配置之前发送记录。此配置的值应大于或等于request.timeout.ms和linger.ms的总和
设置一个大于零的值将导致客户端重新发送任何发送失败并可能出现暂时错误的记录。请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。如果delivery.timeout.ms配置的超时在确认成功之前首先过期,那么在重试次数用完之前,Produce请求将失败。用户通常应该不设置此配置,而是使用delivery.timeout.ms来控制重试行为。
启用幂等性要求此配置值大于0。如果设置了冲突的配置并且没有显式启用幂等性,则会禁用幂等性。
在将enable.idempotence设置为false和将max.in.flight.requests.per-connection设置为大于1时允许重试可能会更改记录的顺序,因为如果将两个批发送到单个分区,并且第一个批失败并重试,但第二个成功,则第二个批中的记录可能会首先出现。
尝试重试对给定主题分区的失败请求之前等待的时间。这避免了在某些失败场景下以紧密循环的方式重复发送请求。
配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者在重试次数用完时使请求失败。
这个参数指定了生产者在发送消息批次之前等待更多消息加如批次的时间。通常情况下,只有当记录到达速度快于发送速度时,才会在加载时发生这种情况。然而,在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。此设置通过添加少量人工延迟来实现这一点——也就是说,生产者将等待指定的延迟,以允许发送其他记录,从而可以将发送分批在一起,而不是立即发送记录。这可以被认为类似于TCP中的Nagle算法。此设置提供了批处理延迟的上限:一旦我们获得一个分区的batch.size大小的记录,无论此设置如何,它都将立即发送,但是,如果我们为该分区累积的字节数少于此数量,我们将“逗留”指定的时间,等待更多记录出现。此设置默认为0(即无延迟)。例如,设置linger.ms=5可以减少发送的请求数量,但在没有负载的情况下,发送记录的延迟将增加5ms。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("delivery.timeout.ms", 30000);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
发送消息有3种模式:
把消息发送给broker,并不关心发送是否成功。
ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-topic", "key", "value");
try {
producer.send(record);
} catch (Exception e) {
logger.error("", e);
}
producer.flush();
producer.close();
ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-topic", "key", "value");
try {
RecordMetadata metadata = producer.send(record).get();
} catch (Exception e) {
logger.error("", e);
}
producer.close();
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null == exception) {
} else {
logger.error("", exception);
}
kafkaProducer.close();
}
});
一般Producer会出现两种错误:
package com.qupeng.demo.kafka.kafkaapache.producer;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class CustomizedSerializer implements Serializer<Product> {
@Override
public byte[] serialize(String topic, Product product) {
byte[] name = product.getName().toString().getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length);
buffer.putInt(product.getId());
buffer.putInt(name.length);
buffer.put(name);
return buffer.array();
}
}
Avro是一种与语言无关的序列化格式,并使用schema来定义格式,用JSON来描述模式。因为Kafaka保存记录是不关心格式的,都作为二进制数组处理,所以Avro非常适合Kafka的客户端用来处理特定格式的消息。
使用Avro格式必须要注意:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props. put("key.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
props.put("schema.registry.url", "...");
Producer<String, Product> producer = new KafkaProducer(props);
while (true) {
Product product = Product.newBuilder().build();
ProducerRecord<String, Product> record = new ProducerRecord("product", product.getName(), product);
producer.send(record);
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props. put("key.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializer.KafkaAvroSerializer");
props.put("schema.registry.url", "...");
Producer<String, GenericRecord> producer = new KafkaProducer(props);
String schemaString = "{\n" +
" \"namespace\": \"com.qupeng.demo.kafka.kafkaspringbootproducer.avro\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"Product\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"type\": \"int\",\n" +
" \"name\": \"id\",\n" +
" \"default\": 0\n" +
" },\n" +
" {\n" +
" \"type\": \"string\",\n" +
" \"name\": \"name\",\n" +
" \"default\": \"\"\n" +
" }\n" +
" ]\n" +
"}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("id", 0);
genericRecord.put("name", "iPhone 17");
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>("product", "0", genericRecord);
producer.send(producerRecord);
由配置参数partitioner.class指定分区器类,除了内置分区器,还可以自定义分区器:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String msgValue = value.toString();
return msgValue.contains("0") ? 0 : 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
使用配置参数interceptor.classes指定拦截器类。
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
public class MyProducerInterceptor implements ProducerInterceptor {
private Logger logger = LoggerFactory.getLogger(MyProducerInterceptor.class);
@Override
public ProducerRecord onSend(ProducerRecord record) {
record.headers().add("correlationId", UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
logger.info(metadata.toString());
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
覆盖默认配置的选项在3.0版本之后已经删除,只能使用动态配置来修改。
quota.producer.default=10485760
quota.consumer.default=10485760
# 用命令动态修改配额
kafka-configs.sh --bootstrap-server "172.26.143.96:9092" --alter --add-config 'producer_byte_rate=1024, consumer_byte_rate=2048' --entity-type clients --entity-name rest-api-1
# 用命令查看配额
kafka-configs.sh --bootstrap-server "172.26.143.96:9092" --describe --entity-type clients --entity-name rest-api-1