Kafka(八)使用Kafka构建数据管道

发布时间:2024年01月21日

@[toc](目录)

1 使用场景

使用Kafka构建数据管道时,通常有两种应用场景:

  1. 将Kafka作为数据管道的两个端点之一
    把Kafka中的数据移动到S3,或者把MongoDB中的数据移动到Kafka。
  2. 把Kafka作为数据管道的两个端点的中间媒介
    为了把Twitter中的数据移动到ElasticSerach,需要先将他们移动到Kafka,然后再从Kafka中移动ElasticSerach。

Kafka作为数据管道的主要价值在于:
3. 作为大型数据缓冲区;
4. 解耦数据生产者和消费者,让同一个数据源的数据被多个具有不同可用性需求的系统和应用程序使用。

2 构建数据管道时需要考虑的问题

2.1 及时性

有些系统希望每天一次性接收大量数据,有些系统则希望在数据生成几毫秒之后就能获取它们。大部分数据管道介于这两者之间。良好的数据集成系统可以支持各种数据管道的及时性需求,而且在业务需求发生变更时能够轻松地在不同的时间表之间迁移。
作为一个基于流的数据平台,Kafka提供了可靠且可伸缩的数据存储,可以支持从几近实时的数据管道到基于天的批处理。生产者既可以频繁地向Kafka写入数据,也可以根据实际需要写入。消费者可以在数据到达的第一时间就读取它们,也可以进行批处理:每小时运行一次,连接到Kafka,并读取前一小时积压的数据。
Kafka本身就针对生产者使用了回压策略(必要时延后向生产者发送确认),因为读取速度完全是由消费者决定的。

2.2 可靠性

高可用

我们要避免单点故障,并能够自动从各种故障中快速恢复。

可靠性数据传递

数据传递保证是可靠性的另一个重要考量因素。有些系统允许数据丢失,但在大多数情况下,它们要求至少一次传递,也就是要求源系统的每一个事件都必须到达目的地,只是有时候重试操作可能造成重复传递。有些系统甚至要求精确一次传递系统的每一个事件都必须到达目的地,不允许丢失,也不允许重复。
Kafka本身支持至少一次传递,如果再结合支持事务模型或唯一键的外部存储系统,那么Kafka也能实现精确一次传递。因为大部分端点是数据存储系统,它们提供了精确一次传递的原语支持,所以基于Kafka的数据管道也能实现精确一次传递。值得一提的是,Connect API为集成外部系统提供了处理偏移量的API,我们可以很方便地构建出支持精确一次传递的端到端数据管道。

2.3 高吞吐量

为了满足现代数据系统的需求,数据管道需要支持非常高的吞吐量。更重要的是,在某些情况下,数据管道还要能够应对突发的吞吐量增长。
和果将Kafka作为生产者和消费者之间的缓冲区,那么消费者吞吐量和生产者吞吐量就不会耦合在一起。没有必要实现复杂的回压机制,因为如果生产者吞吐量超过了消费者丢吐量,则可以把数据积压在Kafka中,等待消费者追赶上来。Kafka支持单独增加额外的消费者或生产者,我们可以在数据管道的任何一端进行动态伸缩,以便满足不断变化的需求。

  • Kafka是高吞吐量的分布式系统,一个一般规模的集群每秒可以处理数百兆数据,所以根本无须担心数据管道无法满足伸缩性需求。
  • Connect API还擅长并行处理,既可以在单节点上进行,也可以扩展到多个节点,具体取决于系统的需求。
  • Kafka支持多种类型的压缩算法,在吞吐量增加时,Kafka用户和管理员可以通过压缩来控制对网络和存储资源的使用。

2.4 数据格式

数据管道需要协调各种数据格式和数据类型,这是其在构建时需要考虑的一个非常重要的方面。数据类型取决于不同的数据库和存储系统。你可以通过Avro将XML或关系数据加载到Kafka中,然后将它们转成JSON写入ElasticSearch,或者转成Parquet写入HDFS,或者转成CSV写入S3。

  • Kafka 本身和Connect API与数据格式无关。
  • Connect API有自己的包括数据类型和模式的内存对象模型,我们也可以使用一些可插拔的转换器将这些对象保存成任意格式。
  • 很多数据源和数据池提供了模式,我们从数据源读取数据时会一并读取模式,并把它们保存起来,用于验证数据格式兼容性或更新数据池的模式。
  • 在将Kafka中的数据写入外部系统时,数据池连接器需要负责处理数据格式。有些连接器的数据格式处理过程是可插拔的,比如S3的连接器就支持Avro和Parquet。
    通用的数据集成框架不仅要支持各种不同的数据类型,还要处理好不同数据源和数据池之间的行为差异。

2.5 转换

数据转换比其他方面的需求更具争议性。构建数据管道有两种方式,即ETL和ELT。

ETL

ETL表示提取-转换-加载(extract-transform-load)当数据流经数据管道时,数据管道负责修改它们。这种方式为我们节省了时间和存储空间,因为不需要经历保存数据、修改数据、再保存数据这样的过程。这种方式最主要的不足在于,在数据管道中进行数据转换会对数据管道下游的应用程序造成一些限制,特别是当下游应用程序希望对数据做进一步处理的时候。

ELT

ELT表示提取-加载-转换(extract-load-transform)。在这种模式下,数据管道只做少量的转换(主要是数据类型转换),确保到达数据池的数据尽可能与数据源保持相似。在这种数据管道架构中,目标系统会收集“原始数据”,并完成所有的处理任务。
这种架构的好处在于,它为目标系统用户提供了最大的灵活性,因为它们可以访问到完整的数据。在这种架构中诊断问题也更加容易,因为数据处理被限定在一个系统中,而不是分散在数据管道和其他应用程序中。这种架构的不足在于,数据的转换占用了目标系统太多的CPU和存储资源。有时候,目标系统造价高昂,如果有可能,人们希望将计算任务移出这些系统。
Connect提供了单一消息转换功能,在将消息从源系统复制到Kafka或从Kafka复制到目标系统时,可以对消息进行转换,包括将消息路由到不同的主题、过滤消息、修改数据类型、修改特定字段,等等。涉及连接和聚合的复杂转换可以通过Kafka Streams来完成。

2.6 安全性

对于数据管道的安全性,我们主要关心以下几个方面。

  1. 谁有权限访问写入Kafka的数据?
  2. 能否保证流经数据管道的数据是经过加密的?这是跨数据中心数据管道通常要考虑的一个关键问题。
  3. 谁有权限修改数据管道?
  4. 如果数据管道需要从一个不受信任的系统读取或写入数据,那么是否有适当的身份验证机制?
  5. PII(个人识别信息)的存储、访问和使用是否符合法律和监管的要求?
  • Kafka支持加密传输数据,从数据源到Kafka,再从Kafka到数据池。
  • 它还支持身份验证(通过SASL来实现)和授权,所以你可以确信,如果一个主题包含了敏感信息,那么在不经授权的情况下,数据不会流到不安全的系统中。
  • Kafka还提供了审计日志来跟踪未经授权的访问。通过编写额外的代码,还可能跟踪到每条消息来自哪里以及谁修改了消息,从而可以对每条消息进行溯源。
  • Connect及其连接器需要在身份验证之后连接到外部数据系统,因此连接器的配置信息当中需要包含身份验证凭证。
  • 不建议将凭证保存在配置文件中,因为如果这样就不得不格外小心地保护好这些文件。一种常见的解决方案是使用外部密钥管理系统,比如HashiCorp Vault。Connect支持外部密钥配置。Kafka只提供了允许接入外部可插拔配置提供程序的框架和一个从文件中读取配置的提供程序示例。Kafka社区开发了很多外部配置提供程序,可以与Vault、AWS和Azure 集成。

2.7 故障处理

不能总是假设数据是完美的,而是要事先做好应对故障的准备。我们能否总是把缺损的数据挡在数据管道之外?我们能否恢复无法解析的记录?我们能否修复(或许由人工手动修复)并重新处理缺损的数据?如果在若干天之后才发现原先看起来正常的数据其实是缺损数据,该怎么办?
因为Kafka可以长时间保留数据,所以如果有必要,那么可以在适当的时候回过头来重新处理出错的数据。如果目标系统错过了一些消息,那么可以重放保存在Kafka中的事件。

2.8 耦合性和灵活性

数据管道最重要的作用之一是解耦数据源和数据池。数据管道会在很多情况下出现耦合。

临时数据管道

有些公司会为每一对相互连接的应用程序建立单独的数据管道。
它们将数据管道与特定的端点耦合起来,并创建了大量的集成点,这需要额外的部
署、维护和监控。当有新的系统加入,它们需要构建额外的数据管道,从而增加了采用新技术的成本,也遏制了创新。

元数据丢失

如果数据管道没有保留模式元数据,并且不允许模式发生变更,那么最终会导致生产者和消费者之间发生紧密的耦合。如果数据管道支持模式变更,那么各个应用程序就可以随意修改自己的代码,无须担心对整个系统造成破坏。

末端处理

数据管道难免要对数据做一些处理工作。
不过,如果数据管道过多地处理数据,则会将下游系统与在构建数据管道时所做
的决策联系在一起,比如如何保留字段、如何聚合数据等等。如果下游系统发生变化,那么数据管道就要做出相应的修改,这种方式不仅不灵活,低效且不安全,
更为灵活的方式是尽量保留原始数据的完整性,让下游系统自己决定如何处理和聚合数据。

3 使用Connect API

Kafka客户端是内嵌到应用程序中的,应用程序用它向Kafka 写入数据或从Kafka读取数据。如果可以修改与Kafka相连接的应用程序的代码,或者想要将数据推送到Kafka或从Kafka读取数据,那么就可以使用Kafka客户端。
如果要将Kafka连接到数据存储系统,而这些系统不是你开发的,你无法或者不想修改它们的代码,那么可以使用Connect。Connect可用于从外部数据存储系统读取数据,或者将数据推送到外部存储系统。要使用Connect,首先要有数据存储系统连接器,现在有很多可用的连接器。在实际使用时,Connect用户只需提供配置文件即可。
如果要连接的数据存储系统没有相应的连接器,则可以考虑使用客户端APIConnect API开发一个。建议首选Connect,因为它不仅提供了像配置管理、偏移量存储、并行处理、错误处理这样的开箱即用的特性,而且支持多种数据类型和标准的REST管理API。

3.1 Connect的数据处理流程

![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/bb8ab30338154ade9029368370da9b38.png#pic_center)

source

数据源,外部提供数据来源的系统。

sink

数据池,接收数据的目标系统,HDFS,hbase等。

connectors

连接器主要负责处理以下3件事情。

  1. 确定需要运行多少个任务。
  2. 确定如何根据任务拆分数据复制工作。
  3. 从worker获取任务配置信息并传给任务。

例如,JDBC连接器会连接到数据库,统计需要复制的数据表,并确定需要多少个任
务,然后在配置参数 tasks.max和实际的数据表数量之间选择数值较小的那个作为任务数。在确定了任务数之后,连接器会为每一个任务生成一个配置,配置信息里包含了连接器的配置项(如connection.url)和这个任务需要复制的数据表。taskConfigs()方法会返回一个map(包含每一个任务的配置信息)列表。worker负责启动任务并将配置信息传给它们。每一个任务负责复制配置信息里指定的数据表。需要注意的是,当你通过REST API启动连接器时,它可能会在任意节点上启动,随后它启动的任务也会在这个节点上执行。

Tasks

任务负责将数据移入或移出Kafka。在初始化任务时,worker会为其分配上下文信息。
源系统的上下文中包含了一个对象,任务可以将源系统记录的偏移量保存在这个对象
里(例如,文件连接器的偏移量就是文件中的一个位置,JDBC连接器的偏移量可以
是表中的一个时间戳)。目标系统的上下文提供了一些方法,连接器可以用它们控制从Kafka接收到的数据,比如应用回压策略、进行错误重试,或者通过将偏移量保存到外部系统来实现精确一次传递。任务在初始化之后会按照连接器指定的配置(包含在一个Properties 对象里)开始工作。在任务开始之后,源系统任务将对外部系统进行轮询并返回一些记录,然后由worker发送给Kafka,数据池任务则通过 worker 接收来自Kafka的记录,然后将它们写入外部系统。

Workers

worker是执行连接器和任务的“容器”,是一个计算机进程。它们负责处理用于定义连接器及其配置的HTTP请求、将连接器的配置保存到内部Kafka主题上、启动连接器和任务,并把配置信息传给任务。如果一个worker停止工作或发生崩溃,那么集群中的其他worker就会感知到(根据Kafka消费者协议的心跳机制来判断),并会将发生崩溃的worker的连接器和任务重新分配给其他worker。如果有新的worker加入集群,那么其他worker也会感知到,并会将自己的连接器和任务分配给新worker,确保工作负载均衡地分布到少worker上。worker 还负责自动将偏移量提交到内部的Kafka王题上,当任务抛出常时会进行重试。
可以这样理解:连接器和任务负责“移动数据”,worker则负贡处理REST API请求、和
置管理、可靠性、高可用性、伸缩性和负载均衡。
这种关注点分离是Connect API 给我们带来的最大好处,而这种好处是普通客户端API所不具备的。

Transformer

单一消息转换,SMT(Single Message Transformation),在复制消息时,对单条消息进行转换。
Kafka支持的SMT有:

名称说明
Cast改变一个字段的数据类型。
MaskField将一个字段的内容替换成null。这在移除敏感信息或个人识别数据时非常有用。
Filter丢弃或包含符合指定条件的记录。内置的条件包括匹配主题名称、匹配特定的标头、消息是否为墓碑消息(值为null)。
Flatten将嵌套的数据结构扁平化,也就是将所有字段的名字连接成路径的形式。
HeaderFrom将消息里的字段移动或复制到标头里。
InsertHeader在每一条消息的标头里加入一个固定的字符串。
InsertField在消息里添加一个字段,字段的值既可以来自元数据(如偏移量),也可以是一个固定的值。
RegexRouter使用正则表达式和替换字符串改变目标主题。
ReplaceField移除或重命名消息里的字段。
TimestampConverter修改一个字段的时间格式,比如将Unix Epoch转成字符串。
TimestampRouter根据消息的时间戳来改变主题。这在数据池连接器中很有用,当我们希望基于消息的时间戳将它们复制到特定的分区表中时,主题将被用于查找目标系统的相关数据集。
errors.tolerance配置死信队列
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error
errors.deadletterqueue.context.headers.enable = true
errors.deadletterqueue.topic.replication.factor=1

Converters

数据模型和转换器是Connect API需要讨论的最后一部分内容。Connect提供了一组数据API,包括数据对象和用于描述数据的模式。例如,JDBC连接器会从数据库读取一个字段,并基于这个字段的数据类型创建一个Connect Schema对象。然后,连接器会基于模式创建一个包含了所有数据库字段的Struct。我们保存了每一个字段的名字和它们的值。其他源连接器所做的事情都很相似:从源系统读取记录,为每条记录生成Schema和Value。
目标连接器正好相反,它们用Schema解析Value,并把值写入目标系统。
源连接器负责基于数据API生成数据对象,那么worker是如何将这些数据对象保存到
Kafka中的呢?这个时候,转换器就派上用场了。用户在配置worker或连接器时可以使用合适的转换器。目前可用的转换器有原始类型转换器、字节数组转换器、字符串转换器、Avro 转换器、JSON转换器、JSON模式转换器和Protobuf转换器。JSON转换器既可以在转换结果中带上模式,也可以不带,这样就可以支持结构化和半结构化的数据。连接器会通过数据API将数据返回给 worker,然后worker会使用指定的转换器将数据转换成Avro对象、JSON对象或字符串,转换后的数据会被写入Kafka。
对目标连接器来说,过程刚好相反。在从Kafka读取数据时,worker会使用指定的转换器将各种格式(也就是原始类型、字节数组、字符串、Avro、JSON、JSON模式和Protobuf)的数据转换成数据API格式的对象,然后将它们传给目标连接器,目标连接器再将它们插入目标系统中。
因此,Connect API可以支持多种类型的数据,并独立于连接器的实现。(也就是说,只要有可用的转换器,连接器和数据类型可以自由组合)

偏移量管理

偏移量管理是worker为连接器提供的便捷服务之一(除了可以通过REST API进行部署和配置管理之外)。连接器需要知道哪些数据是已经处理过的,它们可以通过Kafka提供的API来维护已处理的消息的偏移量。
对源连接器来说,连接器返回给worker的记录里包含了一个逻辑分区和一个逻辑偏移量。
它们并非Kafka的分区和偏移量,而是源系统的分区和偏移量。例如,对文件源来说,”
区可以是一个文件,偏移重可以是文件中的一个行号或字符号。对JDBC源来说,分区可以是一个数据库表,偏移量可以是表中一条记录的ID或时间戳。在设计源连接器时,需更着重考虑如何对源系统的数据进行分区以及如何跟踪偏移量,这将影响连接器的并行能力,也决定了是否能够实现至少一次传递或精确一次传递。

源连接器将返回一些记录,记录里包含了分区和偏移量信息,worker会将这些记录发送给Kafka。如果Kafka明认记录保存成功,那么worker就会把偏移量保存下来。如果连接器发生崩溃,则在重启之后可以从最近保存的偏移量位置继续处理数据。偏移量的存储机制是可插拔的,通常会使用Kafka主题,我们可以通过 offset.storage.topic参数来指定主题。另外,Connect也使用Kafka主题来保存连接器的配置信息和每个连接器的运行状态,它们分别通过 config.storage.topic和status. storage.topic来指定。
目标连接器的处理过程恰好相反,不过也很相似。它们会从Kafka上读取包含了主题、分区和偏移量信息的记录,然后调用连接器的put()方法,将记录保存到目标系统中。如果保存成功,就用消费者提交偏移量的方式将偏移量提交到Kafka上。

框架提供的偏移量跟踪机制简化了连接器的开发工作,并在使用不同的连接器时保证了一定程度的行为一致性。

3.2 运行Connect

Connect随Kafka一起发布,所以无须单独安装。如果你打算在生产环境中用Connect来移动大量数据或运行多个连接器,那么最好把Connect部署在单独的服务器上。你可以在所有的服务器上安装Kafka,只启动部分服务器上的broker,然后启动剩余服务器上的Connect。
Connect启动可分为单机模式和分布式模式:

  • 单机模式Standalone
    是最简单的模式,用单一进程负责执行所有connector和task。对于在本地计算机上开发和测试 Kafka Connect 非常有用。它还可用于通常使用单个代理的环境(例如,将 Web 服务器日志发送到 Kafka)。
  • 分布式模式Distributed
    Distributed模式为Kafka Connect提供了可扩展性和自动容错能力。在分布式模式下,你可以使用相同的组启动许多worker进程。它们自动协调以跨所有可用的worker调度connector和task的执行。

启动worker与启动broker差不多,只需在调用启动脚本时传入一个属性配置文件即可:

# 分布式模式
bin/connect-distributed.sh config/connect-distributed.properttes
#单机模式
bin/connect-standalone.sh config/connect-standalone.properttes

3.2.1 配置Connect

worker有以下几个重要的配置参数:

bootstrap.servers

这个参数列出了将要与Connect协同工作的broker服务器,连接器会向这些broker写入
数据或从它们那里读取数据。无须为这个参数提供所有broker的地址,不过建议至少
指定3个。

group.id

具有相同组ID的worker 属于同一个Connect集群。集群的连接器和它的任务可以运行
在任意一个工作节点上。

plugin.path

Connect采用了一种可插拔的架构,支持添加连接器、转换器、转换和密钥提供程序。
可以配置一个或多个目录,Connect可以在这些目录中找到连接器及其依赖项。例如,
可以配置 plugin.path=/opt/connectors,/home/gwenshap/connectors。我们通常会在其中的一个目录中为每个连接器创建一个子目录。例如,创建/opt/connectors/jdbc和/opt/connectors/elastic这两个子目录。在每个子目录中,我们会放置连接器的JAR包及其所有的依赖项。如果连接器是作为胖JAR包发行的,那么就可以直接把它放在plugin.path指定的目录中,不需要创建子目录。但需要注意的是,将依赖项放在顶级目录中是无效的。
还可以将连接器及其所有依赖项添加到Connect的类路径中。但不建议这么做,因为如果连接器的依赖项与Kafka的某个依赖项冲突,则可能会出现错误。推荐使用plugin.path配置参数。

key.converter和value.converter

Connect可以处理多种数据格式。这两个参数分别指定了消息的键和值所使用的转换器。默认使用的是Kafka提供的JSONConverter,当然也可以配置成Confluent模式注册表提供的AvroConverter、ProtobufConverter 或JscoSchemaConverter。
有些转换器还提供了特定的配置参数。如果想使用这些配置参数,则需要给它们加上
key.converter.或 value.converter.这样的前缀。举个例子,JSON格式的消息既可
以包含模式也可以不包含模式,只需分别将key.converter.schema.enable设置成true
或者false即可。消息的值类似,只需分别将 value.converter.schema.enable设置出
true或false即可。Avro消息也包含了模式,不过需要使用 key.converter.schems
registry.url和value.converter.schema.registry.url来指定模式注册表的位置。

rest.host.name fe rest.port

一般通过Connect的REST API来配置和监控连接器。可以为REST API指定特定的
端口。

3.2.2 启动Connect

运行命令启动Connect:

connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties

在启动worker集群之后,可以通过REST API来验证它们是否运行正常。

curl http://localhost:8083
{"version":"3.6.0","commit":"60e845626d8a465a","kafka_cluster_id":"qt2GSuryTQSYWle2Yxt2tg"}

还可以检查已经安装好的连接器插件:

curl http://localhost:8083/connector-plugins

[
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector""type": "sink",
        "version": "3.6.0"
    }{
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector""type": "source",
        "version": "3.6.0"
    }{
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector""type": "source",
        "version": "3.6.0"
    }{
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector""type": "source",
        "version": "3.6.0"
    }{
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector""type": "source",
        "version": "3.6.0"
    }
]

3.2.3 管理连接器

Conect提供了如下API来管理Connector:

REST API说明
GET /connector-plugins返回所有可用的connector插件
GET /connectors返回所有正在运行的connector
POST /connectors新建一个connector;请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name}
GET /connectors/{name}/config获取指定connector的配置信息
PUT /connectors/{name}/config更新指定connector的配置信息
GET /connectors/{name}/status获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status获取指定connector的task的状态信息
PUT /connectors/{name}/pause暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume恢复一个被暂停的connector
POST /connectors/{name}/restart重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskld}/restart重启一个task,一般是因为它运行失败才这样做
DELETE /connectors/{name}删除一个connector,停止它的所有task并删除配置

3.3 文件连接器示例

文件连接器是一个最简单的连接器示例,用来在开发环境理解连接器的执行过程。Kafka在新版本中(如本文的3.6.0)取消了内置的文件连接器,所以需要自行从源码构建并安装此连接器。下面咱们就一起来搭建文件连接器。

3.3.1创建连接器

首先从Github下载Kafka文件连接器的源码:
[https://github.com/apache/kafka/tree/trunk/connect/file/src/main/java/org/apache/kafka/connect/file](https://github.com/apache/kafka/tree/trunk/connect/file/src/main/java/org/apache/kafka/connect/file)
在本地创建项目并构建出jar包:
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/2b53cdfd44844e5cbbea6e92be54631f.png)
源码参见:
[https://gitee.com/qp1886358/kafka-connect-file-connector](https://gitee.com/qp1886358/kafka-connect-file-connector)

3.3.2 安装连接器插件

创建插件目录,将构建好的jar包和依赖库拷贝到此目录下:

sudo mkdir -p /opt/connectors/file

sudo cp -rf  /mnt/d/00_programming/02_source_code/idea/kafka-springboot/kafka-connect-file-connector/target/kafka-connect-file-connector-1.0-SNAPSHOT.jar  /opt/connectors/file/kafka-connect-file-connector-1.0-SNAPSHOT.jar

sudo cp -rf /mnt/d/00_programming/02_source_code/idea/kafka-springboot/kafka-connect-file-connector/target/lib /opt/connectors/file/lib

打开Connect的配置文件,修改配置:

sudo vim $KAFKA_HOME/config/connect-standalone.properties

修改如下内容:

bootstrap.servers=172.26.143.96:9092,172.26.143.96:9093,172.26.143.96:9094
plugin.path=/opt/connectors

启动Connect:

connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties

确认插件已成功安装,能看到FileStreamSourceConnector和FileStreamSinkConnector说明插件可用。

curl http://localhost:8083/connector-plugins
[
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "3.6.0"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "3.6.0"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "3.6.0"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source""version": "3.6.0"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "3.6.0"
    }
]

3.3.3 启动数据源连接器

接下来创建数据源文件:

echo "Hello Connect!" >> /home/peter/kafka-connect-source.txt

添加源连接器:

echo '{
    "name": "local-file-source-1",
    "config": {
        "connector.class": "FileStreamSource",
        "file": "/home/peter/kafka-connect-source.txt",
        "topic": "connect-test",
		"transforms": "InsertHeader",
		"transforms.InsertHeader.type":
		"org.apache.kafka.connect.transforms.InsertHeader",
		"transforms.InsertHeader.header": "MessageSource",
		"transforms.InsertHeader.value.literal": "local-file-source-1"
    }
}' | curl -X POST -d @- http://localhost:8083/connectors -H "Content-Type: application/json"

可以选择启动一个控制台消费者,观察主题中是否有新消息:

kafka-console-consumer.sh --topic connect-test --bootstrap-server 172.26.143.96:9092 --from-beginning

向文件中写入一行文本:

echo "What's up!" >> /home/peter/kafka-connect-source.txt

消费者立刻收到一条消息:

$ kafka-console-consumer.sh --topic connect-test --bootstrap-server 172.26.143.96:9092 --property print.headers=true
MessageSource:local-file-source-1	{"schema":{"type":"string""optional":false},"payload":"What's up!"}

3.3.4 启动数据池连接器

通过API添加数据池连接器:

echo '{
    "name": "local-file-sink-1",
    "config": {
        "connector.class": "FileStreamSink",
        "file": "/home/peter/kafka-connect-sink.txt",
        "topics": "connect-test"
    }
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

查看数据池文件:

cat /home/peter/kafka-connect-sink.txt

What's up!

最后,可以选择删除连接器:

curl -X DELETE http://localhost:8083/connectors/local-file-source-1
curl -X DELETE http://localhost:8083/connectors/local-file-sink-1

3.3.5 使用配置文件启动连接器

使用配置文件可以在启动Connect worker同时添加文件连接器:

connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties

可以将连接器的配置写在文件中:

# $KAFKA_HOME/config/connect-file-source.properties
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/home/peter/kafka-connect-source.txt
topic=connect-test
#$KAFKA_HOME/config/connect-file-sink.properties
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/home/peter/kafka-connect-source.txt
topic=connect-test
文章来源:https://blog.csdn.net/yunyun1886358/article/details/135711114
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。