Kafka集群详解

发布时间:2023年12月31日


Kafka介绍

Apache Kafka是一个开源流处理平台,由Scala和Java编写。它最初由LinkedIn公司开发,并于2011年初开源。Kafka是一个高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,Kafka是一个可行的解决方案。

Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。在这个平台上可以发布和订阅数据流,并将它们保存起来进行处理。Kafka可以存储和持续处理大型的数据流,并且具有流处理能力,可以高效地处理数据。

Kafka中,消息用主题进行分类,主题下有若干个分区,有新消息会以追加的形式写入分区。由于主题会有多个分区,所以在整个主题范围内无法保证消息顺序。分区可以分布在不同的服务器上,实现数据冗余和伸缩。消费者可以订阅一个或多个主题,并且每次读取一条数据时,偏移量会增加1。此外,Kafka集群由多个Kafka实例组成,每个实例称为broker。无论是Kafka集群、producer还是consumer都依赖于Zookeeper集群来保存一些元信息,以保持系统的可用性。


Kafka集群介绍

Kafka是一个分布式的发布/订阅消息系统,使用Scala语言编写,最初由LinkedIn公司发布。它主要用于处理活跃的数据,如登录、浏览、点击、分享等用户行为产生的数据。

Kafka集群由多个Kafka服务节点组成,每个节点称为一个Broker。在Kafka集群中,没有“中心主节点”的概念,集群中的所有节点都是对等的。每个Broker就是一个Kafka服务实例,多个Broker构成一个Kafka集群。生产者发布的消息将保存在Broker中,消费者将从Broker中拉取消息进行消费。

在Kafka中,消息用主题进行分类,每个主题包含一个或多个分区。在创建主题时,需要指定包含的分区数据。分区可以提高负载,因为每个分区是不同的磁盘。此外,每个分区可以有多个副本,分布在不同的Broker上。Kafka会选出一个副本作为Leader,所有的读写请求都会通过Leader完成。当Leader宕机后,会从副本中选出一个新的Leader继续提供服务,实现故障自动转移。

Kafka集群是一个具有高吞吐量、高水平扩展性的分布式消息系统。通过将数据分区并分布在多个节点上,Kafka集群可以处理大规模的数据流并保证系统的可用性和可靠性。

在这里插入图片描述

Kafka集群特点

Kafka集群具有以下特点:

  1. 高吞吐量、低延迟 :Kafka每秒可以处理几十万条消息,延迟最低只有几毫秒。
  2. 横向扩展 :Kafka集群支持热扩展,可以方便地增加或减少节点。
  3. 分区与副本 :每个topic可以分多个partition,且每个partition都有多个副本,副本用于实现数据冗余和故障转移。
  4. 持久性、可靠性 :消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  5. 容错性 :允许集群中节点失败,若副本数量为n,则允许n-1个节点失败。
  6. 高并发 :支持数千个客户端同时读写。

此外,Kafka还广泛应用于日志收集、消息系统、运营指标等场景。


Kafka集群搭建

Kafka集群搭建的步骤如下:

  1. 环境准备:选择适合的Linux操作系统,并安装好JDK。同时,需要准备好Zookeeper集群,因为Kafka集群依赖于Zookeeper进行协调管理。
  2. 下载并解压Kafka安装包:从Apache Kafka官网下载最新版本的Kafka安装包,并解压到合适的目录。
  3. 配置Kafka集群:进入Kafka配置目录,找到server.properties文件,并进行相应的配置。主要配置内容包括Broker的ID、监听的端口、日志存储路径、Zookeeper地址等。在配置时,需要确保每个Broker的ID在集群中是唯一的。
  4. 启动Kafka集群:在每个节点上启动Kafka服务。可以使用Kafka自带的脚本启动服务,并检查服务是否启动成功。
  5. 验证集群状态:使用Kafka自带的命令行工具或者其他客户端工具,验证集群状态是否正常。可以查看集群中的主题、分区、副本等信息,以及生产者、消费者的状态。

在搭建过程中,需要注意以下几点:

  1. 确保所有节点的时钟同步,避免出现时间戳不一致的问题。
  2. 在配置Zookeeper地址时,需要使用Zookeeper集群的地址,而不是单个节点的地址。
  3. 在启动服务前,需要确保所有节点的配置文件已经正确配置,并且具有相同的配置内容。
  4. 在验证集群状态时,需要确保生产者、消费者能够正常连接到集群,并且能够正常读写数据。

在这里插入图片描述

Kafka集群如何进行故障切换

在Kafka集群中,故障切换主要依赖于Zookeeper组件的协调。Zookeeper是一个分布式协调服务,它可以监控Kafka集群中各个Broker(服务器节点)的状态。当Leader节点宕机时,Zookeeper会触发新的Leader选举。

在选举新Leader的过程中,Zookeeper会考虑各个Follower的同步状态,优先选择数据最新、最完整的Follower作为新的Leader。这样可以尽量保证数据的一致性,避免数据丢失。一旦新的Leader被选举出来,所有的读写请求就会被自动转发到新的Leader,对客户端来说,这个过程是透明的。

此外,Kafka通过多副本机制实现故障自动转移。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。当leader发生故障或挂掉时,一个新leader被选举并接收客户端的消息成功写入。

Kafka还允许将分区复制到多个Broker进行故障转移,以提高系统的可用性和容错性。通过以上机制,Kafka集群可以在发生故障时进行自动切换,保证服务的可用性和数据的可靠性。


Kafka集群Leader的选举

Kafka集群中的Leader选举是维护集群稳定性和数据一致性的重要机制。当某个分区的Leader节点出现故障时,集群会从其他副本中选择一个新的Leader,以保证服务的连续性。

Kafka使用Zookeeper作为其分布式协调服务,每个主题的分区都有一个对应的Zookeeper路径,用于存储分区的元数据和ISR(In-Sync Replicas)集合。ISR集合是当前与Leader同步的副本集合,只有当副本加入ISR集合后,才会被选为新的Leader。

在Leader选举过程中,Kafka会根据副本的同步状态和在Zookeeper中的选举结果来选择新的Leader。如果ISR集合中的副本数量不足一半,Kafka会等待ISR中的任意一个副本恢复,并重新进行选举。如果ISR集合中的副本数量超过一半,Kafka会从ISR集合中随机选择一个副本作为新的Leader。

此外,Kafka还提供了unclean.leader.election.enable配置项,允许在ISR集合中没有合适的副本时选择非同步副本作为新的Leader。不过,这可能会导致数据的不一致性,因此需要谨慎使用。

Kafka集群的Leader选举机制通过Zookeeper的协调和ISR集合的选择,保证了集群的稳定性和数据的一致性。在故障发生时,能够快速地进行故障切换,保证服务的可用性。


Kafka集群如何快速横向拓展

Kafka集群的横向拓展可以通过增加Broker节点来实现。每个Broker节点是Kafka服务的一个实例,通过增加Broker节点,可以增加Kafka集群的处理能力和存储能力。

在增加Broker节点时,需要确保新的节点与现有的节点具有相同的配置,包括端口号、日志存储路径等。同时,需要将新的节点加入到Zookeeper集群中,以便进行协调管理。

在Kafka中,数据分区和副本是分布在不同的Broker节点上。因此,通过增加Broker节点,可以将更多的分区和副本分布到新的节点上,提高集群的处理能力和容错性。

需要注意的是,在增加Broker节点后,需要重新平衡集群中的分区和副本分布。Kafka提供了工具和命令来重新分配分区和副本,以确保集群的负载均衡和数据一致性。

总之,通过增加Broker节点,Kafka集群可以实现快速横向拓展,提高处理能力和存储能力。在增加节点时,需要注意配置一致性和负载均衡问题,以保证集群的稳定性和可靠性。


Kafka集群搭建最佳实践

Kafka集群搭建的最佳实践包括以下几个方面:

  1. 硬件和系统配置:根据业务需求和数据量,选择适当的硬件配置,包括CPU、内存、存储和网络等。同时,确保操作系统和Kafka版本之间的兼容性。
  2. 版本选择:选择稳定且适合业务需求的Kafka版本。新版本可能包含新功能和性能改进,但也可能存在一些未解决的问题。因此,建议在生产环境中使用经过充分测试和验证的稳定版本。
  3. 配置优化:根据实际需求,对Kafka的配置进行优化。例如,调整Kafka的并发度和吞吐量、设置合理的日志存储大小和时间、调整Zookeeper的连接参数等。在生产环境中,建议进行性能测试和调优,以找到最优的配置参数。
  4. 数据备份和恢复:制定适当的数据备份和恢复计划,以防止数据丢失和灾难恢复。定期备份Kafka数据,并确保备份数据的可用性和可恢复性。
  5. 安全性和可靠性:确保Kafka集群的安全性,采取适当的措施保护数据隐私和安全。同时,为了确保数据的可靠性和可用性,可以采用多副本和分布式部署等方案。
  6. 监控和维护:建立Kafka集群的监控系统,实时监测集群的状态、性能和异常。及时发现和处理问题,保证集群的稳定性和可用性。同时,定期进行集群的维护和清理工作,保持集群的良好状态。
  7. 扩展性和灵活性:在设计Kafka集群时,考虑到未来业务的发展和变化,确保集群具有良好的扩展性和灵活性。可以采取分区、复制和横向扩展等方案,以满足不断增长的业务需求。
  8. 最佳实践遵循:在实施过程中,遵循Kafka社区的最佳实践和建议。关注社区动态和技术进展,了解最新的解决方案和技术趋势,不断完善和优化集群的搭建和运维。

总之,Kafka集群搭建的最佳实践需要综合考虑硬件配置、版本选择、配置优化、数据备份、安全性和可靠性、监控和维护、扩展性和灵活性等方面。在实际操作中,可以根据具体情况进行调整和优化,以达到最佳的效果。


Kafka集群可以使用单节点Zookeeper吗

Kafka集群可以使用单节点Zookeeper,但一般不推荐。因为Zookeeper是分布式协调服务,它主要负责维护和协调Kafka集群中的各个节点。在生产环境下,为了确保系统的可用性和可靠性,建议使用多节点Zookeeper集群。


Kafka集群的消费者信息保存在那里

Kafka集群的消费者信息保存在Kafka内部的topic中,这个topic被命名为__consumer_offsets。每个消费者都会在Kafka中保存其消费的进度,也就是offset,这些信息被存储在__consumer_offsets的partition中。在默认情况下,Kafka会为每个消费者组在__consumer_offsets中创建一个分区,并使用消费者的group ID、主题名称和分区编号作为key来存储对应的offset值。

Kafka集群的消费者信息保存在Kafka集群中的broker节点上,而不是保存在Zookeeper或其他外部系统中。这种设计是为了提高系统的可用性和可靠性。因为即使Kafka集群中的Zookeeper出现故障,消费者的消费进度也不会受到影响,因为它们都存储在Kafka的broker节点上。

此外,消费者可以通过Kafka提供的API来提交其消费进度。每次提交进度时,Kafka都会将新的offset值写入__consumer_offsets的相应partition中。消费者还可以通过查询__consumer_offsets来获取其消费进度,或者通过Kafka提供的命令行工具来查看和验证其消费进度。

Kafka集群的消费者信息保存在Kafka内部的__consumer_offsets topic中,这些信息存储在broker节点上,并通过API进行提交和查询。这种设计提高了系统的可用性和可靠性,使得即使Zookeeper出现故障,消费者的消费进度也不会受到影响。

Kafka集群的Topic的分区数的设置规则

Kafka集群的Topic分区数的设置规则需要考虑多个因素,包括业务需求、数据量、性能和可用性等。

首先,Topic的分区数量应大于或等于Broker的数量,最好是broker的数量乘以每一台机器上可用的核数,以提高吞吐率。如果一个Topic的分区数量过少,可能会导致性能瓶颈,因为每个分区只能由一个生产者写入,而多个消费者可以并行地从该生产者读取数据。

其次,在生产环境中,为了获得更高的吞吐量,可以基于目标吞吐量来设置分区数量。具体来说,可以通过计算单个生产者和消费者所能实现的所有性能来设置分区数量,公式为max(t/p, t/c),其中t是目标吞吐量,p是生产者性能,c是消费者性能。

另外,分区数的设置还需要考虑应用程序的需求。例如,如果应用程序需要处理大量数据,并且要求数据按照某个键进行排序和聚合,那么可以使用更多的分区来提高处理效率。如果应用程序需要更快的读取速度,可以使用较少的分区来减少消费者之间的竞争。

最后,在设置分区数时还需要注意一些限制和约束。例如,每个分区只能由一个生产者写入,而多个消费者可以并行地从该生产者读取数据。因此,如果一个Topic的分区数量过多,可能会导致生产者性能瓶颈。另外,分区数的设置还需要考虑到Kafka集群的可用性和容错性。例如,如果Kafka集群中的Broker节点出现故障,需要保证剩余的Broker节点能够继续提供服务。

Kafka集群的Topic分区数的设置规则需要根据业务需求、数据量、性能和可用性等因素综合考虑。在实践中,建议根据实际需求进行测试和调整,以找到最优的设置。


Kafka集群如何提高吞吐量

Kafka集群可以通过以下几种方式提高吞吐量:

  1. 增加生产者和消费者的数量:增加生产者和消费者的数量可以并行地增加生产和消费的速率,从而提高吞吐量。确保生产者和消费者的数量与分区数量相匹配,以避免瓶颈。
  2. 调整生产者和消费者的并发度:通过调整生产者和消费者的并发度,可以控制同时进行生产和消费的线程或进程数量。根据实际需求和系统资源来合理配置并发度,可以提高吞吐量。
  3. 优化生产者和消费者的性能:优化生产者和消费者的性能可以减少单次生产和消费的时间,从而提高吞吐量。可以对生产者和消费者的代码进行性能分析和优化,例如减少序列化和反序列化的时间、优化网络传输等。
  4. 调整Kafka的配置参数:通过调整Kafka的配置参数,可以优化Kafka集群的性能和吞吐量。例如,可以增加缓冲区大小、调整批处理大小、设置合理的副本因子等。
  5. 使用压缩数据:Kafka支持使用压缩数据来减少存储空间和网络带宽的使用,从而提高吞吐量。可以选择使用不同的压缩算法,例如gzip、snappy等,根据实际需求和压缩效果来选择合适的压缩算法。
  6. 优化磁盘I/O性能:Kafka的磁盘I/O性能是影响Kafka性能和吞吐量的关键因素之一。可以通过优化磁盘I/O性能来提高Kafka的吞吐量。例如,使用SSD硬盘、调整磁盘I/O参数、优化文件存储结构等。

通过多种方式综合优化可以提高Kafka集群的吞吐量。在实际应用中,建议根据实际情况进行测试和调整,以找到最优的设置和配置。


Kafka数据压缩类型

Kafka支持多种数据压缩类型,包括:

  1. Gzip :一种常用的压缩算法,可以有效地减小数据的大小,但压缩和解压缩速度相对较慢。
  2. Snappy :一种快速的压缩算法,比Gzip更快的压缩和解压缩速度,但压缩率略低。
  3. LZ4 :一种非常快速的压缩算法,具有很高的压缩和解压缩速度,但压缩率相对较低。
  4. Zstandard(Zstd) :一种新的压缩算法,由Facebook开发,具有很高的压缩率和压缩速度,被认为是目前最快的压缩算法之一。

Kafka数据压缩的原理基于特定的压缩算法,将消息本身进行压缩并存储,待消费时再解压。压缩可以减少数据在传输过程中的大小,从而减轻网络传输的压力。在大数据处理场景中,瓶颈往往体现在网络传输上,而非CPU资源,因此数据压缩可以在一定程度上提高系统的整体性能。

Kafka支持以集合(batch)为单位发送消息,并在此基础上支持对消息集合进行压缩。Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。当Producer端进行压缩后,Consumer端需进行解压以还原原始数据。

在Kafka中,消息的头部会添加一个描述压缩属性的字节,该字节的后两位表示消息的压缩采用的编码。如果后两位为0,则表示消息未被压缩。

需要注意的是,虽然压缩可以减少传输的数据量,但压缩和解压过程会消耗CPU资源。因此,在选择是否启用压缩功能时,需要根据实际需求和性能要求进行权衡。

这些压缩类型都可以在Kafka的生产者和消费者中使用。生产者可以在配置中指定压缩类型,而消费者可以自动处理压缩数据。根据实际需求和性能要求选择合适的压缩类型可以提高Kafka的性能和吞吐量。

Kafka的数据压缩方式主要有两种:基于消息的压缩和基于日志的压缩。

基于消息的压缩方式,也称为端到端压缩,是在消息发送到Kafka时,对每条消息进行独立压缩,然后将压缩后的数据发送到Kafka。这种方式的优点在于,压缩和解压缩操作可以在消费者端进行,不需要Kafka支持。此外,由于每条消息都被独立压缩,因此可以充分利用压缩算法对不同消息进行差异化压缩,提高压缩率。但是,这种方式的缺点在于,如果每条消息大小差异很大,可能会导致压缩后的数据大小差异也很大,从而影响存储和网络传输效率。

基于日志的压缩方式,也称为日志压缩,是在整个Kafka日志层面进行压缩,而不是对每条消息进行压缩。这种方式的优点在于,由于是整个日志进行压缩,因此可以避免因消息大小差异大而导致的存储和传输效率问题。此外,由于Kafka本身支持压缩操作,因此可以减少消费者端的处理压力。但是,这种方式的缺点在于,如果日志量很大,可能会导致压缩和解压缩操作成为性能瓶颈。

Kafka的数据压缩方式各有优缺点,需要根据实际应用场景和需求进行选择。如果需要充分利用压缩算法对不同消息进行差异化压缩,且每条消息大小差异较大,可以选择基于消息的压缩方式;如果需要避免因消息大小差异导致的存储和传输效率问题,且日志量较大,可以选择基于日志的压缩方式。


Kafka开启数据压缩

要开启Kafka数据压缩,需要配置Kafka生产者或消费者来指定压缩算法。以下是一个简单的示例代码,展示如何在Kafka生产者中开启数据压缩:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 设置Kafka生产者配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // 开启数据压缩
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        String topic = "test-topic";
        String key = "key";
        String value = "value";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record);

        // 关闭生产者实例
        producer.close();
    }
}

在上述示例中,通过设置ProducerConfig.COMPRESSION_TYPE_CONFIG属性为gzip,开启了数据压缩功能。你可以根据需要选择不同的压缩算法,例如snappylz4等。同时,确保Kafka生产者和消费者版本兼容,并使用正确的压缩算法和依赖库。


Kafka消费者中开启数据压缩

在Kafka消费者中开启数据压缩,需要在消费者配置中指定压缩类型。以下是一个简单的示例代码,展示如何在Kafka消费者中开启数据压缩:

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 设置Kafka消费者配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
        props.put(ConsumerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // 开启数据压缩

        // 创建Kafka消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Arrays.asList("test-topic"));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value());
            }
        }
    }
}

在上述示例中,通过设置ConsumerConfig.COMPRESSION_TYPE_CONFIG属性为gzip,开启了数据压缩功能。你可以根据需要选择不同的压缩算法,例如snappylz4等。同时,确保Kafka生产者和消费者版本兼容,并使用正确的压缩算法和依赖库。


通过KRaft和Zookeeper启动Kafka的区别

Kafka是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据。其核心组件包含Producer、Broker、Consumer,以及依赖的Zookeeper集群。其中Zookeeper集群是Kafka用来负责集群元数据的管理、控制器的选举等。

然而,由于重度依赖Zookeeper集群,当Zookeeper集群性能发生抖动时,Kafka的性能也会收到很大的影响。因此,为了解决这个问题,Kafka引入了KRaft新内部功能,取消对Zookeeper的依赖。

在Kafka引入KRaft新内部功能后,对Zookeeper的依赖将会被取消。在KRaft中,一部分broker被指定为控制器,这些控制器提供过去由ZooKeeper提供的共识服务。这样做的好处有以下几点:

  1. Kafka不再依赖外部框架,而是能够独立运行;
  2. controller管理集群时,不再需要从Zookeeper中先读取数据,集群性能上升;
  3. 由于不依赖Zookeeper,集群扩展时不再受到Zookeeper读写能力限制;
  4. controller不再动态选举,而是由配置文件规定。

总的来说,通过KRaft和Zookeeper启动Kafka的区别在于,Zookeeper是Kafka用来负责集群元数据的管理和控制器选举的外部框架,而KRaft则是Kafka引入的新内部功能,旨在取消对Zookeeper的依赖,提高集群性能和可扩展性。


Kafka集群可以不依赖Zookeeper吗

在Kafka 2.8之前,Kafka重度依赖于Zookeeper集群做元数据管理和集群的高可用(即所谓的共识服务)。在Kafka 2.8之后,引入了基于Raft协议的KRaft模式,支持取消对Zookeeper的依赖。在此模式下,一部分Kafka Broker被指定为Controller,另一部分则为Broker。这些Controller的作用就是以前由Zookeeper提供的共识服务,并且所有的元数据都将存储在Kafka主题中并在内部进行管理。

总体而言,使用KRaft的好处如下:Kafka不用再依赖外部框架,能够做到独立运行。类似于Redis的Sentinel,它的本质仍然是一个Kafka实例。Controller管理集群时,不再需要从Zookeeper中先读取数据,因此集群的性能得到一定的提升。由于不再依赖Zookeeper,Kafka集群扩展时不用再受到Zookeeper读写能力的限制。Controller不再动态选举,而是由配置文件规定。这样可以有针对性的加强Controller节点的配置,而不是像以前一样对随机Controller节点的高负载束手无策。

因此,Kafka集群可以不依赖Zookeeper


Kafka集群中Zookeeper的作用

在Kafka集群中,Zookeeper扮演了关键的角色,主要体现在以下几个方面:

  1. 维护集群元数据:Zookeeper负责维护Kafka集群的元数据,这包括broker的状态、topic的分区配置以及consumer group的消费状态等。这些元数据的维护和管理对于Kafka集群的正常运行至关重要。
  2. Broker注册:由于Broker是分布式部署且相互独立的,需要有一个注册系统来管理整个集群中的Broker。Zookeeper就扮演了这个角色。每个Broker在启动时都会在Zookeeper上进行注册,创建属于自己的节点,并将自己的IP地址和端口信息记录到该节点中。这样,Kafka集群就能够通过Zookeeper来追踪和管理所有的Broker。
  3. 选举Leader:在Kafka集群中,每个分区都有一个leader和多个follower。当leader出现故障或不可用时,Zookeeper会进行leader选举,重新选举一个新的leader来处理该分区的数据。这是确保Kafka集群高可用性的关键机制之一。
  4. 集群配置管理:Kafka还通过Zookeeper来管理集群的配置,例如broker的配置信息、topic的配置信息等。这些配置信息对于Kafka集群的运行和性能优化都非常重要。

总的来说,Zookeeper在Kafka集群中扮演了元数据存储、Broker管理、Leader选举以及集群配置管理等关键角色,是确保Kafka集群正常运行和高可用性的重要组成部分。


Kafka中的Controller的作用

Kafka中的Controller是Kafka集群的核心组件,用于管理和协调整个Kafka集群。以下是Controller的主要作用:

  1. 选举Leader和ISR:Controller从ZK的/brokers/topics加载一个topic所有分区的所有副本,从分区副本列表中选出一个作为该分区的leader,并将该分区对应所有副本置于ISR列表,其他分区类似。
  2. 监控Broker变化:Controller启动时就起一个监视器监视ZK/brokers/ids/子节点,用于监听和处理器Broker的加入和删除。
  3. 监控Topic变化:Controller启动时就起一个监视器监视ZK/brokers/topics/子节点,用于监听和处理器Topic的创建和删除。
  4. Topic维护:Controller帮助我们完成对Kafka主题的创建、删除以及分区增加的操作。换句话说,当我们执行kafka-topics脚本时,大部分的后台工作都是Controller来完成的。
  5. 分区重分配:kafka-reassign-partitions脚本提供的对已有主题分区进行细粒度的分配功能,也是Controller实现的。

Kafka中的Controller作用主要是管理和协调整个Kafka集群,包括选举Leader和ISR、监控Broker和Topic的变化、维护主题以及分区重分配等。

Kafka使用指南

文章来源:https://blog.csdn.net/zhangzehai2234/article/details/135311410
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。