本文介绍了Kafka的基本概念和常用命令,包括Kafka的架构、特点和应用场景,以及Topics、Producer、Consumer和Groups命令的使用方法和常用选项。通过学习本文,您可以了解Kafka的基本原理和使用方法,以及如何使用命令行工具管理和操作Kafka集群。
Kafka是一个开源的分布式流处理平台,最初由LinkedIn开发并于2011年开源。它被设计用于高吞吐量、低延迟的数据传输,以及处理实时数据流。因其常被应用于消息队列,所以又被叫做分布式消息队列。
Kafka
集群中的每个节点都是一个独立的 Broker
,它负责存储和处理消息。每个 Broker
可以在一个或多个Topic
上发布和订阅消息。
消息在 Kafka
中以Topic
的形式进行组织和分类。一个Topic
可以有多个Partition
,每个Partition
在不同的 Broker
上进行复制,以实现高可用性和容错性。
Topic
可以被分为多个Partition
,每个Partition
在磁盘上以一个日志文件进行存储。Partition允许消息在集群中进行并行处理,提高了吞吐量和扩展性。offset
是用来唯一标识Partition
中消息的位置信息。每个Partition
都有自己的offset
序列,用于标识消息在Partition
中的顺序。Kafka
使用一种称为log segment
(日志段)的文件结构来保存消息。每个Topic
的每个Partition
都有一个或多个log segment
,每个log segment
都是一个连续的消息序列。当消息被写入Kafka
时,它们会被追加到当前活动的log segment
中。Kafka
使用index file
(索引文件)来加快消息的查找。index file
(索引文件)包含了消息offset
(偏移量)和物理文件
位置之间的映射关系,以便能够快速定位特定offset
(偏移量)的消息。Producer
负责将消息发布到 Kafka
集群中的指定Topic
。Producer
可以选择将消息发送到特定的Partition
,也可以使用Partitioner
(分区器)根据某种策略自动选择分区。
Consumer
可以订阅一个或多个Topic
,并从Partition
中拉取消息进行处理。消费者可以以不同的方式进行消息消费,例如,按照时间顺序、按照分区顺序或者以并行的方式消费消息。
多个Consumer
(消费者)可以组成一个Consumer Group
(消费者组),共同消费一个Topic
的消息。每个Partition
只能由同一个Consumer Group
(消费者组)中的一个Consumer
(消费者)进行消费,这样可以实现负载均衡和水平扩展。
Kafka能够处理大规模的实时数据流,并具有非常高的吞吐量。它通过将数据分布在多个分区中,并允许并行写入和读取操作,实现了高度的并发性和可伸缩性。
Kafka将数据持久化到磁盘上,以确保数据的可靠性和持久性。它使用了顺序写入的方式来提高写入性能,并使用复制机制来保证数据的冗余备份,从而提供了高可靠性的数据存储。
Kafka的设计目标之一是能够轻松地扩展到大规模的集群。它通过分区和副本的概念来实现数据的分布和冗余备份,可以根据需求增加或减少分区和副本的数量,以适应不断增长的数据流量和负载。
Kafka是一个实时数据流平台,能够以毫秒级的延迟处理数据。它采用了发布-订阅模式,允许实时地将数据发布到主题(topics)中,并允许消费者实时订阅和处理这些数据。
Kafka通过复制机制和分布式协调服务(如ZooKeeper)来提供高可靠性的数据存储和处理。它能够自动处理故障,包括节点故障、网络故障等,并保证数据的一致性和可用性。
Kafka可以用作数据管道,将实时生成的数据流传输到不同的数据处理系统中。它可以接收大量的数据并将其持久化,同时允许多个消费者以实时方式订阅和处理这些数据。这种能力使得Kafka非常适合用于构建实时数据流处理和分析平台。
Kafka可以用作日志收集和聚合的中间件。应用程序可以将日志消息发送到Kafka主题中,然后使用消费者来聚合、分析和存储这些日志数据。这种方式可以实现高可靠性的日志收集和处理,并支持实时监控和分析。
Kafka作为消息队列系统,可以用于构建异步通信和解耦应用程序组件之间的通信。应用程序可以将消息发送到Kafka主题中,然后其他应用程序可以通过订阅这些主题来接收和处理消息。这种方式可以实现松耦合的系统架构,提高系统的可伸缩性和可靠性。
Kafka可以与流处理框架(如Apache Flink、Apache Spark等)结合使用,构建实时流处理应用程序。Kafka作为数据源和数据接收器,可以提供可靠的数据传输和持久化,同时流处理框架可以通过Kafka的分区机制实现水平扩展和并行处理。
Kafka可以作为事件驱动架构的核心组件,用于实现事件的发布和订阅。应用程序可以将事件发送到Kafka主题中,然后其他应用程序可以通过订阅这些主题来接收和处理事件。这种方式可以实现松耦合、可扩展和可靠的事件驱动系统。
副本机制是指将消息主题的分区数据复制到多个Broker
上,以确保数据的冗余和可靠性。在Kafka中,每个Partition
可以有多个副本,其中一个副本被称为leader
(领导者),其他副本被称为follower
(追随者)。
以下是Kafka副本机制详解:
领导者和追随者角色
每个Partition
都有一个leader
和零个或多个follower
。leader负责处理所有的读写请求,而follower只负责复制leader的数据。
数据复制
leader
将消息写入本地日志,并将消息的副本发送给follower
。follower接收到消息后,将其写入本地日志,并向leader发送确认消息。leader在收到足够数量的确认消息后,将消息标记为已提交。
ISR(In-Sync Replicas)
每个Partition
的所有follower
中,与leader
保持同步的副本被称为ISR
。只有ISR中的副本才能成为新的leader。如果一个follower与leader的同步延迟太大或无法与leader保持连接,它将被移出ISR。
副本选举
如果leader
发生故障或无法正常工作,Kafka会自动进行副本选举,选择一个新的leader。副本选举过程中,只有ISR
中的副本才有资格成为新的leader。
容错性
通过将消息的副本分布在多个Broker上,Kafka实现了容错性。即使某个Broker发生故障,其他副本仍然可以继续提供服务,确保数据的可用性和持久性。
扩展性
通过增加分区和副本的数量,Kafka可以实现水平扩展。更多的分区和副本可以提供更高的吞吐量和更好的负载均衡。
Topics命令用于创建、列出和删除Kafka主题。您可以使用该命令来创建新的主题、查看现有主题的列表以及删除不再需要的主题。
kafka-topics.sh --option1 value1 --option2 value2 ...
下面是Topics命令的常用选项:
--alter
:修改主题的分区数、副本分配和/或配置。
--bootstrap-server <String: server to connect to>
:必需选项,指定要连接的Kafka服务器。如果提供了此选项,则不需要直接连接到Zookeeper。
--command-config <String: command config property file>
:指定包含要传递给Admin Client的配置的属性文件。此选项仅与--bootstrap-server
选项一起使用,用于描述和修改代理配置。
--config <String: name=value>
:为要创建或修改的主题提供配置覆盖。以下是一些有效的配置选项:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
请参阅Kafka文档以获取有关主题配置的完整详细信息。此选项仅在使用--bootstrap-server
选项时与--create
结合使用。
--create
:创建新的主题。
--delete
:删除主题。
--delete-config <String: name>
:删除现有主题的配置覆盖。不支持与--bootstrap-server
选项一起使用。
--describe
:列出给定主题的详细信息。
--disable-rack-aware
:禁用机架感知的副本分配。
--exclude-internal
:在运行列表或描述命令时排除内部主题。默认情况下,内部主题将被列出。
--force
:禁止控制台提示。
--help
:打印使用信息。
--if-exists
:如果设置了此选项并且要更改或删除或描述的主题存在,则仅执行操作。不支持与--bootstrap-server
选项一起使用。
--if-not-exists
:如果设置了此选项并且要创建的主题尚不存在,则仅执行操作。不支持与--bootstrap-server
选项一起使用。
--list
:列出所有可用的主题。
--partitions <Integer: # of partitions>
:要创建或修改的主题的分区数(警告:如果增加具有键的主题的分区数,将影响分区逻辑或消息的顺序)。
--replica-assignment <String: broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...>
:为要创建或修改的主题提供手动分区到代理的分配列表。
--replication-factor <Integer: replication factor>
:要创建的主题中每个分区的副本因子。
--topic <String: topic>
:要创建、修改、描述或删除的主题。它还可以接受正则表达式,除了--create
选项。将主题名称放在双引号中,并使用\
前缀来转义正则表达式符号,例如:“test.topic”。
--topics-with-overrides
:如果在描述主题时设置了此选项,则仅显示具有覆盖配置的主题。
--unavailable-partitions
:如果在描述主题时设置了此选项,则仅显示其领导者不可用的分区。
--under-replicated-partitions
:如果在描述主题时设置了此选项,则仅显示未充分复制的分区。
--zookeeper <String: hosts>
:已弃用选项,指定Zookeeper连接字符串,格式为host:port。可以提供多个主机以实现故障转移。
以下是常用的Topics命令示例及其解释:
创建一个名为my_topic
的主题:
kafka-topics.sh --create --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --partitions 3 --replication-factor 2
解释:使用--create
选项创建一个名为my_topic
的主题,该主题具有3个分区和2个副本。
--alter
:修改名为my_topic
的主题的分区数为5:
kafka-topics.sh --alter --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --partitions 5
解释:使用--alter
选项修改名为my_topic
的主题的分区数为5。
--bootstrap-server <String: server to connect to>
:连接到Kafka服务器集群:
kafka-topics.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --list
解释:使用--bootstrap-server
选项连接到Kafka集群,并列出所有可用的主题。
--command-config <String: command config property file>
:使用名为admin.properties
的属性文件连接到Kafka服务器:
kafka-topics.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --list --command-config admin.properties
解释:使用--command-config
选项指定名为admin.properties
的属性文件,该文件包含连接到Kafka服务器所需的配置信息,并列出所有可用的主题。
--describe
:查看名为my_topic
的主题的详细信息:
kafka-topics.sh --describe --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic
解释:使用--describe
选项查看名为my_topic
的主题的详细信息,包括分区和副本信息。
--delete
:删除名为my_topic
的主题:
kafka-topics.sh --delete --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic
解释:使用--delete
选项删除名为my_topic
的主题。
--exclude-internal
:列出所有不包含内部主题的主题:
kafka-topics.sh --list --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --exclude-internal
解释:使用--exclude-internal
选项列出所有不包含内部主题的主题。
--force
:删除名为my_topic
的主题时禁止确认提示:
kafka-topics.sh --delete --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --force
解释:使用--force
选项删除名为my_topic
的主题时,禁止确认提示。
--topics-with-overrides
:仅显示具有覆盖配置的主题的详细信息:
kafka-topics.sh --describe --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topics-with-overrides
解释:使用--topics-with-overrides
选项仅显示具有覆盖配置的主题的详细信息。
--unavailable-partitions
:仅显示其领导者不可用的分区的详细信息:
kafka-topics.sh --describe --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --unavailable-partitions
解释:使用--unavailable-partitions
选项仅显示其领导者不可用的分区的详细信息。
--under-replicated-partitions
:仅显示未充分复制的分区的详细信息:
kafka-topics.sh --describe --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --under-replicated-partitions
解释:使用--under-replicated-partitions
选项仅显示未充分复制的分区的详细信息。
--zookeeper <String: hosts>
:连接到Zookeeper服务器集群:
kafka-topics.sh --list --zookeeper "192.168.145.103:2181,192.168.145.104:2181,192.168.145.105:2181"
解释:使用--zookeeper
选项连接到Zookeeper服务器集群,并列出所有可用的主题。
--help
:打印帮助信息:
kafka-topics.sh --help
解释:使用--help
选项打印关于Topics命令的帮助信息。
kafka-console-producer.sh
命令用于从命令行向Kafka主题发送消息。可以使用该命令将消息发送到指定的主题,以便进行测试和调试。
kafka-console-producer.sh --broker-list <broker-list> --topic <topic>
--batch-size <Integer: size>
:如果消息不是同步发送的,指定一次发送的消息批量大小。默认值为200。--broker-list <String: broker-list>
:必需选项,指定Kafka服务器的地址和端口列表,格式为HOST1:PORT1,HOST2:PORT2。--compression-codec [String: compression-codec]
:指定消息的压缩编解码器,可选值为’none’、‘gzip’、‘snappy’、‘lz4’或’zstd’。如果没有指定值,则默认为’gzip’。--line-reader <String: reader_class>
:指定用于从标准输入读取行的类名。默认情况下,每行被读取为一个单独的消息。--max-block-ms <Long: max block on send>
:生产者在发送请求期间阻塞的最长时间(以毫秒为单位)。默认值为60000。--max-memory-bytes <Long: total memory in bytes>
:生产者用于缓冲等待发送到服务器的记录的总内存大小。默认值为33554432字节(32MB)。--max-partition-memory-bytes <Long: memory in bytes per partition>
:为每个分区分配的缓冲区大小(以字节为单位)。当接收到小于此大小的记录时,生产者将尝试将它们乐观地组合在一起,直到达到此大小。默认值为16384字节(16KB)。--message-send-max-retries <Integer>
:代理服务器可能因多种原因而无法接收消息,而且临时不可用只是其中之一。此属性指定在生产者放弃并丢弃此消息之前的重试次数。默认值为3。--metadata-expiry-ms <Long: metadata expiration interval>
:在未看到任何领导者更改的情况下,强制刷新元数据的时间间隔(以毫秒为单位)。默认值为300000毫秒(5分钟)。--producer-property <String: producer_prop>
:以key=value形式传递自定义属性给生产者。--producer.config <String: config file>
:生产者配置属性文件。注意,[producer-property]优先于此配置。--property <String: prop>
:以key=value形式传递自定义属性给消息读取器。这允许为用户定义的消息读取器进行自定义配置。--request-required-acks <String: request required acks>
:生产者请求的必需确认级别。默认值为1。--request-timeout-ms <Integer: request timeout ms>
:生产者请求的确认超时时间。值必须为非负且非零值。默认值为1500毫秒。--retry-backoff-ms <Integer>
:在每次重试之前,生产者刷新相关主题的元数据。由于领导者选举需要一些时间,此属性指定生产者在刷新元数据之前等待的时间量。默认值为100毫秒。--socket-buffer-size <Integer: size>
:TCP接收缓冲区的大小。默认值为102400字节(100KB)。--sync
:如果设置了该选项,消息发送请求将以同步方式发送到代理服务器,一次只发送一个消息。--timeout <Integer: timeout_ms>
:如果设置了该选项,并且生产者运行在异步模式下,它指定了消息在等待足够的批量大小时排队的最大时间。值以毫秒为单位,默认为1000毫秒。--topic <String: topic>
:必需选项,指定要发送消息的目标主题。--help
:打印使用信息。--broker-list <String: broker-list>
:必需选项,指定Kafka服务器的地址和端口列表,格式为HOST1:PORT1,HOST2:PORT2。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092",192.168.145.104:9092,192.168.145.105:9092 --topic my_topic
解释:指定Kafka服务器的地址和端口列表,连接到包含三个Kafka服务器的集群。
--batch-size <Integer: size>
:如果消息不是同步发送的,指定一次发送的消息批量大小。默认值为200。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --batch-size 100
解释:将消息批量大小设置为100,每次发送100条消息。
--compression-codec [String: compression-codec]
:指定消息的压缩编解码器,可选值为’none’、‘gzip’、‘snappy’、‘lz4’或’zstd’。如果未指定值,则默认为’gzip’。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --compression-codec snappy
解释:将消息的压缩编解码器设置为’snappy’,以使用Snappy压缩算法进行消息压缩。
--max-block-ms <Long: max block on send>
:生产者在发送请求期间阻塞的最大时间(以毫秒为单位)。默认值为60000。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --max-block-ms 5000
解释:将生产者在发送请求期间阻塞的最大时间设置为5000毫秒。
--max-memory-bytes <Long: total memory in bytes>
:生产者用于缓冲等待发送到服务器的记录的总内存大小(以字节为单位)。默认值为33554432(32MB)。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --max-memory-bytes 67108864
解释:将生产者用于缓冲记录的总内存大小设置为67108864字节(64MB)。
--max-partition-memory-bytes <Long: memory in bytes per partition>
:为每个分区分配的缓冲区大小(以字节为单位)。当接收到小于该大小的记录时,生产者会尝试将它们进行乐观地分组,直到达到该大小。默认值为16384(16KB)。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --max-partition-memory-bytes 32768
解释:将每个分区分配的缓冲区大小设置为32768字节(32KB)。
--message-send-max-retries <Integer>
:在多种情况下,代理服务器可能因为多种原因无法接收消息,而临时不可用只是其中之一。该属性指定在生产者放弃并丢弃消息之前的重试次数。默认值为3。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --message-send-max-retries 5
解释:将消息发送的最大重试次数设置为5次。
--metadata-expiry-ms <Long: metadata expiration interval>
:在没有看到任何领导者更改的情况下,强制刷新元数据的时间间隔(以毫秒为单位)。默认值为300000(5分钟)。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --metadata-expiry-ms 600000
解释:将元数据刷新的时间间隔设置为600000毫秒(10分钟)。
--producer-property <String: producer_prop>
:以key=value形式传递用户定义的属性给生产者。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --producer-property acks=all
解释:将acks
属性设置为all
,以确保所有副本都确认接收消息。
--producer.config <String: config file>
:生产者配置属性文件。注意,--producer-property
选项优先于此配置。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --producer.config producer.properties
解释:使用producer.properties
文件中的配置属性连接到Kafka服务器。
--property <String: prop>
:以key=value形式传递用户定义的属性给消息读取器。这允许为用户定义的消息读取器进行自定义配置。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --property key=value
解释:将自定义的属性key=value
传递给消息读取器。
--request-required-acks <String: request required acks>
:生产者请求的所需确认级别。默认值为1。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --request-required-acks -1
解释:将请求的所需确认级别设置为-1,表示生产者等待所有副本都确认接收消息。
--request-timeout-ms <Integer: request timeout ms>
:生产者请求的确认超时时间。值必须为非负且非零值。默认值为1500。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --request-timeout-ms 2000
解释:将请求的确认超时时间设置为2000毫秒。
--retry-backoff-ms <Integer>
:在每次重试之前,生产者会刷新相关主题的元数据。由于领导者选举需要一些时间,该属性指定生产者在刷新元数据之前等待的时间量。默认值为100。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --retry-backoff-ms 200
解释:将生产者在刷新元数据之前等待的时间量设置为200毫秒。
--socket-buffer-size <Integer: size>
:TCP接收缓冲区的大小。默认值为102400(100KB)。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --socket-buffer-size 204800
解释:将TCP接收缓冲区的大小设置为204800字节(200KB)。
--sync
:如果设置了该选项,消息发送请求将以同步方式发送到代理服务器,一次只发送一个消息。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --sync
解释:设置消息发送请求为同步方式,一次只发送一个消息。
--timeout <Integer: timeout_ms>
:如果设置了该选项,并且生产者运行在异步模式下,它指定了消息在等待足够的批量大小时排队的最大时间。值以毫秒为单位,默认为1000。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --timeout 500
解释:将消息排队的最大等待时间设置为500毫秒。
--topic <String: topic>
:必需选项,指定要发送消息的目标主题。
kafka-console-producer.sh --broker-list "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic
解释:指定要发送消息的目标主题为my_topic
。
Consumer命令用于从Kafka主题中消费消息并在命令行中显示。通过该命令,您可以订阅指定的主题并实时查看消息的内容。
kafka-console-consumer.sh --bootstrap-server <server:port> --topic <topic> [options]
--bootstrap-server <String: server to connect to>
:REQUIRED: 要连接的Kafka服务器地址和端口。--consumer-property <String: consumer_prop>
:以键值对形式传递自定义的消费者属性。--consumer.config <String: config file>
:消费者配置属性文件。注意,[consumer-property]优先于此配置。--enable-systest-events
:记录消费者的生命周期事件,除了记录消费的消息之外。(这仅用于系统测试。)--formatter <String: class>
:用于格式化Kafka消息显示的类名。(默认值:kafka.tools.DefaultMessageFormatter)--from-beginning
:如果消费者没有已建立的偏移量,从日志中最早的消息开始消费,而不是最新的消息。--group <String: consumer group id>
:消费者所属的消费者组ID。--help
:打印帮助信息。--isolation-level <String>
:设置为"read_committed"以过滤未提交的事务消息。设置为"read_uncommitted"以读取所有消息。(默认值:read_uncommitted)--key-deserializer <String: deserializer for key>
:键的反序列化器。--max-messages <Integer: num_messages>
:消费的最大消息数量。如果未设置,将持续消费。--offset <String: consume offset>
:要消费的偏移量ID(非负数),或者"earliest"表示从开头开始,或者"latest"表示从末尾开始。(默认值:latest)--partition <Integer: partition>
:要消费的分区。如果未指定"–offset",则从分区末尾开始消费。--property <String: prop>
:用于初始化消息格式化程序的属性。默认属性包括:
print.timestamp=true|false
:是否打印消息的时间戳。print.key=true|false
:是否打印消息的键。print.value=true|false
:是否打印消息的值。key.separator=<key.separator>
:键的分隔符。line.separator=<line.separator>
:行分隔符。key.deserializer=<key.deserializer>
:键的反序列化器。value.deserializer=<value.deserializer>
:值的反序列化器。--skip-message-on-error
:如果在处理消息时出现错误,跳过该消息而不是停止消费。--timeout-ms <Integer: timeout_ms>
:如果指定,当在指定的时间间隔内没有可消费的消息时,退出消费。--topic <String: topic>
:要消费的主题。--value-deserializer <String: deserializer for values>
:值的反序列化器。--whitelist <String: whitelist>
:使用正则表达式指定要包含在消费中的主题的白名单。--bootstrap-server <String: server to connect to>
:指定连接到的Kafka服务器地址和端口。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic
解释:使用--bootstrap-server
选项连接到Kafka集群中的任意一个服务器,例如连接到地址为"192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092"的Kafka服务器。
--consumer-property <String: consumer_prop>
:以键值对形式传递自定义的消费者属性。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --consumer-property group.id=my_consumer_group
解释:使用--consumer-property
选项传递自定义的消费者属性,例如设置消费者组ID为"my_consumer_group"。
--consumer.config <String: config file>
:消费者配置属性文件。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --consumer.config consumer.properties
解释:使用--consumer.config
选项指定消费者配置属性文件,该文件包含消费者的配置信息。
--enable-systest-events
:记录消费者的生命周期事件,除了记录消费的消息之外。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --enable-systest-events
解释:使用--enable-systest-events
选项记录消费者的生命周期事件,以及消费的消息。
--formatter <String: class>
:用于格式化Kafka消息显示的类名。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --formatter kafka.tools.DefaultMessageFormatter
解释:使用--formatter
选项指定用于格式化Kafka消息显示的类名,例如使用kafka.tools.DefaultMessageFormatter
进行格式化。
--from-beginning
:从日志中最早的消息开始消费,而不是最新的消息。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --from-beginning
解释:使用--from-beginning
选项从指定主题的最早消息开始消费,而不是从最新消息开始。
--group <String: consumer group id>
:消费者所属的消费者组ID。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --group my_consumer_group
解释:使用--group
选项指定消费者所属的消费者组ID,以便进行消费者组的管理和协调。
--isolation-level <String>
:设置为"read_committed"以过滤未提交的事务消息。设置为"read_uncommitted"以读取所有消息。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --isolation-level read_committed
解释:使用--isolation-level
选项设置消费者的隔离级别,可以选择只消费已提交的事务消息或者消费所有消息。
--key-deserializer <String: deserializer for key>
:键的反序列化器。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --key-deserializer org.apache.kafka.common.serialization.StringDeserializer
解释:使用--key-deserializer
选项指定键的反序列化器,以便正确解析和显示键的内容。
--max-messages <Integer: num_messages>
:消费的最大消息数量。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --max-messages 100
解释:使用--max-messages
选项指定要消费的最大消息数量,消费达到指定数量后将停止消费。
--offset <String: consume offset>
:要消费的偏移量ID(非负数),或者"earliest"表示从开头开始,或者"latest"表示从末尾开始。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --offset earliest
解释:使用--offset
选项指定要消费的偏移量,可以是具体的偏移量ID,或者使用"earliest"表示从开头开始,或者使用"latest"表示从末尾开始。
--partition <Integer: partition>
:要消费的分区。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --partition 0
解释:使用--partition
选项指定要消费的分区,可以指定分区的编号进行消费。
--property <String: prop>
:用于初始化消息格式化程序的属性。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --property print.timestamp=true --property print.key=true
解释:使用--property
选项初始化消息格式化程序的属性,例如设置打印消息的时间戳和键。
--skip-message-on-error
:如果在处理消息时出现错误,跳过该消息而不是停止消费。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --skip-message-on-error
解释:使用--skip-message-on-error
选项在处理消息时出现错误时跳过该消息,继续消费下一条消息。
--timeout-ms <Integer: timeout_ms>
:如果指定,当在指定的时间间隔内没有可消费的消息时,退出消费。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --timeout-ms 5000
解释:使用--timeout-ms
选项设置超时时间,如果在指定的时间间隔内没有可消费的消息,则退出消费。
--topic <String: topic>
:要消费的主题。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic
解释:使用--topic
选项指定要消费的主题。
--value-deserializer <String: deserializer for values>
:值的反序列化器。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --topic my_topic --value-deserializer org.apache.kafka.common.serialization.StringDeserializer
解释:使用--value-deserializer
选项指定值的反序列化器,以便正确解析和显示值的内容。
--whitelist <String: whitelist>
:使用正则表达式指定要包含在消费中的主题的白名单。
kafka-console-consumer.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --whitelist "topic1|topic2"
解释:使用--whitelist
选项使用正则表达式指定要包含在消费中的主题的白名单,例如匹配"topic1"和"topic2"的主题。
--help
:打印帮助信息。
kafka-console-consumer.sh --help
解释:使用--help
选项打印关于Consumer命令的帮助信息,包括所有可用选项和示例。
kafka-consumer-groups.sh
命令用于管理和查看Kafka消费者组。您可以使用该命令列出消费者组、查看消费者组的偏移量以及重置消费者组的偏移量等操作。
kafka-consumer-groups.sh --bootstrap-server <server:port> [options]
--bootstrap-server <String: server to connect to>
:指定连接到的Kafka服务器地址和端口。--command-config <String: command config property file>
:指定包含命令配置属性的属性文件。--delete
:删除消费者组的偏移量和所有权信息。--describe
:描述消费者组并列出与给定组相关的偏移量差距(尚未处理的消息数)。--dry-run
:仅显示结果,而不执行对消费者组的更改。--execute
:执行操作。--export
:将操作执行结果导出到CSV文件。--from-file <String: path to CSV file>
:从CSV文件中重置偏移量值。--group <String: consumer group>
:要操作的消费者组。--help
:打印帮助信息。--list
:列出所有消费者组。--members
:描述消费者组的成员信息。--offsets
:描述消费者组并列出组中所有主题分区及其偏移量差距。--reset-offsets
:重置消费者组的偏移量。--state
:描述消费者组的状态。--timeout <Long: timeout (ms)>
:设置某些用例的超时时间。--to-current
:将偏移量重置为当前偏移量。--to-datetime <String: datetime>
:将偏移量重置为指定日期时间之后的偏移量。--to-earliest
:将偏移量重置为最早的偏移量。--to-latest
:将偏移量重置为最新的偏移量。--to-offset <Long: offset>
:将偏移量重置为指定的偏移量。--topic <String: topic>
:要删除消费者组信息或包含在重置偏移量过程中的主题。--verbose
:提供附加信息,例如在描述组时显示更多详细信息。--bootstrap-server <String: server to connect to>
:指定连接到的Kafka服务器地址和端口。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --list
解释:使用--bootstrap-server
选项连接到Kafka服务器,此示例连接到kafka集群的9092端口,并显示消费者信息。
--command-config <String: command config property file>
:指定包含命令配置属性的属性文件。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --command-config consumer.properties --list
解释:使用--command-config
选项指定包含命令配置属性的属性文件,此示例使用名为consumer.properties
的属性文件。
--delete
:删除消费者组的偏移量和所有权信息。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --delete --group my_consumer_group
解释:使用--delete
选项删除名为my_consumer_group
的消费者组的偏移量和所有权信息。
--describe
:描述消费者组并列出与给定组相关的偏移量差距(尚未处理的消息数)。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --describe --group my_consumer_group
解释:使用--describe
选项描述名为my_consumer_group
的消费者组,并列出与该组相关的偏移量差距。
--dry-run
:仅显示结果,而不执行对消费者组的更改。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-earliest --all-topics --dry-run
解释:使用--dry-run
选项在重置名为my_consumer_group
的消费者组的偏移量之前,仅显示计划的更改,而不实际执行更改。
--execute
:执行操作。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-earliest --all-topics --execute
解释:使用--execute
选项执行重置名为my_consumer_group
的消费者组的偏移量的操作。
--export
:将操作执行结果导出到CSV文件。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-earliest --all-topics --export --export-file reset_offsets.csv
解释:使用--export
选项将重置名为my_consumer_group
的消费者组的偏移量操作的结果导出到名为reset_offsets.csv
的CSV文件。
--from-file <String: path to CSV file>
:从CSV文件中重置偏移量值。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --from-file reset_offsets.csv --execute
解释:使用--from-file
选项从名为reset_offsets.csv
的CSV文件中读取偏移量值,并重置名为my_consumer_group
的消费者组的偏移量。
--group <String: consumer group>
:要操作的消费者组。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --describe --group my_consumer_group
解释:使用--group
选项指定要操作的消费者组,此示例描述名为my_consumer_group
的消费者组。
--help
:打印帮助信息。
kafka-consumer-groups.sh --help
解释:使用--help
选项打印关于kafka-consumer-groups.sh
命令的帮助信息,包括所有可用选项和示例。
--list
:列出所有消费者组。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --list
解释:使用--list
选项列出连接到"192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092"
的Kafka服务器上的所有消费者组。
--members
:描述消费者组的成员信息。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --describe --group my_consumer_group --members
解释:使用--members
选项显示名为my_consumer_group
的消费者组的成员信息。
--offsets
:描述消费者组并列出组中所有主题分区及其偏移量差距。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --describe --group my_consumer_group --offsets
解释:使用--offsets
选项显示名为my_consumer_group
的消费者组的偏移量信息。
--reset-offsets
:重置消费者组的偏移量。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-earliest --all-topics --execute
解释:使用--reset-offsets
选项重置名为my_consumer_group
的消费者组的偏移量为最早的偏移量,并应用更改。
--state
:描述消费者组的状态。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --describe --group my_consumer_group --state
解释:使用--state
选项显示名为my_consumer_group
的消费者组的状态信息。
--timeout <Long: timeout (ms)>
:设置某些用例的超时时间。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --describe --group my_consumer_group --timeout 10000
解释:使用--timeout
选项设置描述名为my_consumer_group
的消费者组时的超时时间为10,000毫秒。
--to-current
:将偏移量重置为当前偏移量。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-current --all-topics --execute
解释:使用--to-current
选项将名为my_consumer_group
的消费者组的偏移量重置为当前偏移量,并应用更改。
--to-datetime <String: datetime>
:将偏移量重置为指定日期时间之后的偏移量。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-datetime "2023-01-01T00:00:00.000" --all-topics --execute
解释:使用--to-datetime
选项将名为my_consumer_group
的消费者组的偏移量重置为指定日期时间(2023年1月1日00:00:00.000之后)的偏移量,并应用更改。
--to-earliest
:将偏移量重置为最早的偏移量。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-earliest --all-topics --execute
解释:使用--to-earliest
选项将名为my_consumer_group
的消费者组的偏移量重置为最早的偏移量,并应用更改。
--to-latest
:将偏移量重置为最新的偏移量。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-latest --all-topics --execute
解释:使用--to-latest
选项将名为my_consumer_group
的消费者组的偏移量重置为最新的偏移量,并应用更改。
--to-offset <Long: offset>
:将偏移量重置为指定的偏移量。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --reset-offsets --group my_consumer_group --to-offset 100 --topic my_topic --execute
解释:使用--to-offset
选项将名为my_consumer_group
的消费者组的偏移量重置为指定的偏移量(100),并应用更改。
--topic <String: topic>
:要删除消费者组信息或包含在重置偏移量过程中的主题。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --delete --group my_consumer_group --topic my_topic
解释:使用--topic
选项指定要删除消费者组信息或包含在重置偏移量过程中的主题(my_topic)。
--verbose
:提供附加信息,例如在描述组时显示更多详细信息。
kafka-consumer-groups.sh --bootstrap-server "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092" --describe --group my_consumer_group --verbose
解释:使用--verbose
选项在描述名为my_consumer_group
的消费者组时提供更多详细信息。
Kafka安装教程:Kafka安装与配置-shell脚本一键安装配置(集群版)
本文介绍了Kafka的基本概念和常用命令,包括Kafka的架构、特点和应用场景,以及Topics、Producer和Consumer命令的使用方法和常用选项。通过这些命令,可以方便地管理和操作Kafka集群,包括创建和删除主题、发送和消费消息,以及管理消费者组的偏移量等。
总的来说,Kafka是一个功能强大的分布式流处理平台,适用于处理大规模的实时数据流。通过合理使用Kafka的命令和功能,可以构建高吞吐量、低延迟的数据处理系统,实现实时数据流处理、日志收集和聚合、消息队列等应用场景。
希望本文对您有所帮助!如有任何疑问或问题,请随时在评论区留言。感谢阅读!