11、Kafka ------ Kafka 核心API 及 生产者API 讲解

发布时间:2024年01月20日

Kafka核心API 及 生产者API讲解

官方文档

★ Kafka的核心API

Kafka包含如下5类核心API:

在这里插入图片描述

Producer API(生产者API):
应用程序通过该API向主题发布消息。

Consumer API(消费者API):
应用程序通过该API订阅一个或多个主题,并从所订阅的主题中拉取消息(记录)

Streams API(流API):
应用程序可通过该API实现流处理器,可以将一个主题的消息“导流”到另一个主题,并能地对消息进行任意自定义的转换。

类似于 RabbitMQ 的 Exchange

Connector API(连接器API):
应用程序可通过这套API来实现连接器,这些连接器不断地从源系统或应用程序导入数据到Kafka,反过来也可将Kafka消息不断地导入某个接收系统或应用程序。

通过这个API,可以让应用程序和Kafka这个消息系统进行一个实时的交互,我们的系统可以不断的接收来自Kafka的消息,也可以让我们的程序不断的把数据导入到Kafka的消息系统中,就像是一个通道,所以叫连接API。

应用场景:我们的应用程序要和Kafka之间保持实时的数据流的时候,就可以用这个连接API。

AdminAPI(管理API):
应用程序可通过该API管理和检查主题、Broker和其他Kafka实体。

在这里插入图片描述



这5套API中,只有流API使用的是专门的JAR包。

其他都用的是org.apache.kafka:kafka-clients依赖库。

而流API用的是org.apache.kafka:kafka-streams依赖库。



★ 生产者API


在这里插入图片描述

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.6.1</version>
</dependency>

生产者API 的核心类是 KafkaProducer,它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象。

ProducerRecord 代表了一条消息,Kafka 的消息是包含了key、value、timestamp。

ProducerRecord定义了如下6个构造器:

- ProducerRecord(String topic, Integer partition, K key, V value):
  创建一条发送到指定主题和指定分区的消息。

- ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers):
  创建一条发送到指定主题和指定分区的消息,且包含多个消息头。
  
- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value):
  创建一条发送到指定主题和指定分区的消息,且使用给定的时间戳。
  
- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers):
  创建一条发送到指定主题和指定分区的消息、使用给定的时间戳,且包含多个消息头。
  
- ProducerRecord(String topic, K key, V value):
  创建一条发送到指定主题的消息。

- ProducerRecord(String topic, V value):
  创建一条发送到指定主题的、只带value,不带key的消息。

通过查 API 文档可看这个 ProducerRecord 消息对象 的6个构造器:

在这里插入图片描述

Kafka 的API 文档

Kafka 的API 文档

在这里插入图片描述

★ 使用生产者API发送消息

使用生产者API发送消息很简单,基本只要两步:

1、创建KafkaProducer对象,创建该对象时要传入Properties对象,用于对该生产者进行配置。

2、调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。

3、发送完成后,关闭KafkaProducer对象。



为何Kafka的KafkaProducer需要一个Properties来来创建KafkaProducer?

因为Kafka的Producer API提供了海量的配置选项——如果你将这些配置选项每个都定义长方法,那将是一件让人无比痛苦的事情。

所以Kafka在设计该API时,就直接用了一个Properties来封装所有的配置属性。

文章来源:https://blog.csdn.net/weixin_44411039/article/details/135709469
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。