Kafka(四)Broker

发布时间:2024年01月07日


Kafka Broker是Apache Kafka中的一个重要组件,它负责接收、存储和转发消息。Kafka Broker是一个分布式系统,可以在多台服务器上运行,以实现高可用性和水平扩展性。

Kafka Broker通过分区将消息存储在主题中,并且可以处理多个生产者和消费者之间的消息传递。它还负责管理消息的持久性和复制,以确保消息不会丢失,并且可以在发生故障时进行恢复。

Kafka Broker使用Zookeeper来协调集群中的各个节点,以确保数据的一致性和可靠性。它还提供了一系列的API,使得开发者可以方便地与Kafka Broker进行交互,包括生产消息、消费消息和管理主题等操作。

总的来说,Kafka Broker是Apache Kafka中的核心组件,它为分布式消息系统提供了高性能、可靠性和可扩展性的基础。

1 配置Broker

1.1 Broker的配置

broker.id=0

标识符,任意整数,默认值为0

listerers

对外提供服务的URI列表,格式为:

<protocol>://<hostname>:<port>

例如:

listeners=PLAINTEXT://172.26.143.96:9092,SSL://172.26.143.96:9093,SASL_SSL://172.26.143.96:9094

zookeeper.connect

ZooKeeper的地址列表,格式为:

<hostname>:<port>/path

其中path为可选
例如:

127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002

log.dirs

日志片段的存放目录,格式为逗号分隔的本地文件系统路径。
例如:

/tmp/kafka-logs,/tmp/kafka-logs-1,/tmp/kafka-logs-2

log.dir=/tmp/kafka-logs

如果没有指定log.dirs,日志片段保存路径由它指定。
例如:

/tmp/kafka-logs

num.recovery.threads.per.data.dir=1

处理每一个日志片段目录的线程数,默认为1。此线程的职责:

  1. 服务正常启动时打开日志片段
  2. 服务崩溃重启时,检查和截短每个分区的日志片段
  3. 服务器正常关闭时,用于关闭日志片段

auto.create.topics.enable=true

默认为true,以下情况会自动创建主题:

  1. 生产者开始写入消息时
  2. 消费者开始读取消息时
  3. 客户端发送获取元数据请求时

auto.leader.rebalance.enable=true, leader.imbalance.check.interval.seconds=300, leader.imbalance.per.broker.percentage=10

为了确保主题分区的首领权不会集中在一个broker身上,如果设置为true,有一个后台线程会定期(leader.imbalance.check.interval.seconds)扫描broker是否超过了比重(leader.imbalance.per.broker.percentage),如果超过了,就会触发一次领袖再均衡。
在首领再均衡的过程中,Kafka会检查首选首领是否是同步的,当前首领是否时首选首领。如果同步,但不是当前首领,就会触发首领选举,让首选首领成为当前首领。

delete.topic.enable=true

是否允许删除主题。如果关闭,管理工具将无法删除主题。

broker.rack

用来指定broker的机架。用于机架感知副本分配,以实现容错。将broker部署在不同的机架上,同一个分区的不同副本可以分配给位于不同机架上的broker。
例如: ‘RACK1‘, ’us-east-1d‘

replica.lag.time.max.ms=30000 (30 seconds)

如果一个follower这段时间内没有发送任何fetch请求,或者没有消费leader最新偏移量的消息,那么leader将从isr中删除该follower。

zookeeper.session.timeout.ms=18000 (18 seconds)

允许broker不向ZooKeeper发送心跳的时间间隔。如果超过这个时间不向ZK发送心跳,ZK会认为broker已经死亡,会将其移除出集群。

flush.messages=9223372036854775807

此设置允许指定一个间隔,在该间隔,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1,我们将在每条消息之后进行fsync;如果是5,我们将在每5条消息之后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。此设置可以在每个主题的基础上覆盖(请参阅每个主题配置部分)。

flush.ms=9223372036854775807

此设置允许指定一个时间间隔,在该时间间隔内,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1000,我们将在1000毫秒后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。

1.2 主题的默认配置

Kafka为新创建的主题提供了很多默认配置参数,想要修改这些参数,必须通过管理工具或程序来修改。

num.partition=1

要修改分区数,只能增加,不能减少。如果要创建分区数小于默认值的主题,必须手动创建。

如何选择分区数量

  • 主题的吞吐量
  • 单个分区读取的吞吐量
  • 单个分区写入的吞吐量
  • 按键写入分区,未来预期吞吐量
  • 每个broker的分区数,可用磁盘,网络带宽
  • 避免使用太多分区,消耗过多资源和首领选举时间
  • 镜像吞吐量
    云服务有IOPS的限制吗

经验数据:将分区每天保留的数据限制在6G以内

default.replication.factor=1

自动创建主题的默认复制因子。建议为min.insync.replicas+1或+2

min.insync.replicas=1

最小同步副本数。min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数, 当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的 Follower 赶上并重新进入ISR, 因此能够容忍min.insync.replicas-1个副本同时宕机。当与min.insync.replicas和acks一起使用时,可以实现更大的耐用性保证。一个典型的场景是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用acks “all”进行生产。

log.retention.hours=168

数据保留的时间:小时。数据保留时间时根据检查日志片段文件的最后修改时间来开始计算的。一般是日志片段文件关闭时间,如果在分区间移动文件,可能导致时间不准确。

log.retention.minutes

数据保留的时间:分钟。

log.retention.ms

数据保留的时间:毫秒。

log.retention.bytes=-1

数据保留的字节数,对应的是每一个分区。-1代表无限期保留数据。它和时间保留策略哪一个先满足哪一个生效。

log.segment.bytes=1073741824 (1 gibibyte)

日志片段文件达到多大会关闭当前文件,打开一个新的文件。

按照时间戳获取偏移量时,会查找在时间戳之前打开,之后关闭的日志片段文件,然后获取开头的偏移量。

log.roll.hours=168

多长时间后日志片段可以被关闭:单位小时。

log.roll.ms

多长时间后日志片段可以被关闭:单位毫秒。

message.max.bytes=1048588

Kafka允许的最大记录批处理大小(如果启用压缩,则在压缩之后)。如果增加了这个值,并且有0.10.2以上的消费者,那么消费者的提取大小也必须增加,这样他们才能提取这么大的记录批次。在最新的消息格式版本中,为了提高效率,总是将记录分组为批。在以前的消息格式版本中,未压缩的记录不会分组为批,在这种情况下,此限制仅适用于单个记录。这可以通过主题级别的max.message.bytes配置为每个主题设置。

compression.type

指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(“zip”、“snappy”、“lz4”、“zstd”)。它还接受“uncompressed”,这相当于没有压缩;以及“producer”,即保留生产者设置的原始压缩编解码器。

metadata.max.age.ms=300000 (5 minutes)

以毫秒为单位的时间段,在此之后,即使我们没有看到任何分区领导的变化,我们也会强制刷新元数据,以主动发现任何新的broker或分区。

delete.retention.ms

min.compaction.lag.ms

max.compaction.lag.ms

min.cleanable.ratio

log.cleanup.policy

replication.factor

1.3 默认的producer配置

quota.producer.default=10485760(memoved from 3.0)

https://issues.apache.org/jira/browse/KAFKA-12591

quota.consumer.default=10485760(memoved from 3.0)

1.4 默认的consumer配置

group.min.session.timeout.ms=6000 (6 seconds)

已注册消费者允许的最小会话超时。较短的超时导致更快的故障检测,而代价是更频繁的消费者心跳,这可能会使代理资源不堪重负。

group.max.session.timeout.ms=1800000 (30 minutes)

已注册消费者允许的最大会话超时值。更长的超时时间使消费者有更多的时间在检测信号之间处理消息,而检测故障的时间更长。

replica.selector.class

实现ReplicaSelector的完全限定类名。这被代理用来查找首选的读取复制副本。默认情况下,我们使用一个返回leader的实现。
如果对于跨区域,数据中心,为了让消费者从同一区域的分区副本读取消息,需要实现机架感知,需要配置为:apache.kafka.common.replica.RackAwareReplicaSelector。并且Consumer端需要配置client.rack来标识其物理位置,并且与broker的broker.rack参数相匹配。

2 为broker选择硬件

磁盘

  • 吞吐连
  • 容量

内存

系统页面缓存

网络

CPU

3 云端Kafka

4 Kafka集群

4.1 broker的数量

一个aKafka集群需要多少个broker由以下因素决定:

  • 磁盘容量
    需要多少磁盘容量来保存数据?单个broker的磁盘容量
  • 单个broker的复制容量
    每个broker的复制系数会导致磁盘容量成倍增长
  • CPU
    如果有很多客户端连接和请求,可能CPU会称为瓶颈
  • 网络
    需要关注网络接口流量。
    官方建议每个broker的分区副本不超过14000个,每个集群的分区副本不超过100万个。

4.2 操作系统调优

  1. 虚拟内存
    尽量避免内存交换
    vm.swappiness=1 除非发生内存溢出,否则不进行从内存交换
    vm.dirty_background_ratio=5 脏页站系统内存百分比。 是内存可以填充“脏数据”的百分比。这些“脏数据”在稍后是会写入磁盘的,pdflush/flush/kdmflush这些后台进程会稍后清理脏数据。举一个例子,我有32G内存,那么有3.2G的内存可以待着内存里,超过3.2G的话就会有后来进程来清理它。
    vm.dirty_ratio=60到80之间 脏页站系统内存百分比。 是绝对的脏数据限制,内存里的脏数据百分比不能超过这个值。如果脏数据超过这个数量,新的IO请求将会被阻挡,直到脏数据被写进磁盘。这是造成IO卡顿的重要原因,但这也是保证内存中不会存在过量脏数据的保护机制。
    vm.mx_map_count=40w到60w vm.max_map_count是一个与内核虚拟内存子系统相关的参数,用于控制进程可以拥有的内存映射区域的最大数量。它通常用于限制一个进程可以打开的文件数量,特别是在使用大量内存映射文件的情况下。
    vm.overcommit_momery=0 #vm.overcommit_memory 表示内核在分配内存时候做检查的方式。这个变量可以取到0,1,2三个值。默认值为:0
    从内核文档里得知,该参数有三个值,分别是:
    0:当用户空间请求更多的的内存时,内核尝试估算出剩余可用的内存。
    1:当设这个参数值为1时,内核允许超量使用内存直到用完为止,主要用于科学计算
    2:当设这个参数值为2时,内核会使用一个决不过量使用内存的算法,即系统整个内存地址空间不能超过swap+50%的RAM值,50%参数的设定是在overcommit_ratio中设定。
  2. 磁盘
    文件系统:XFS
    挂载点:noatime。禁用最后访问时间
  3. 网络
    所有协议的socket缓冲区:
    net.core.wmem_default=131,072(128KB)
    net.core.rmem_default=131,072(128KB)
    net.core.wmem_max=2,097,152(2MB)
    net.core.rmem_max=2,097,152(2MB)
    TCP协议缓冲区:
    net.ipv4.tcp_wmem=4096 65536 2048000(最小值4KB,默认值64KB,最大值2MB)
    net.ipv4.tcp_rmem=4096 65536 2048000(最小值4KB,默认值64KB,最大值2MB)

net.ipv4.tcp_window_scaling=1 TCP时间窗口扩展
net.ipv4.tcp_max_syn_backlog=1024(default)
net.core.netdev_max_backlog=1000(default)

5 生产环境

5.1 垃圾回收

G1 GC
export KAFKA_JVM_PERFORMANCE_OPTS=“-server -Xmx6g -Xms6g
-XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX: InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatto=50
-XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent”

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

5.2 数据中心布局

机架感知(rack awareness):broker.rack
broker安装在不同的机架上

5.3 共享Zookeeper

文章来源:https://blog.csdn.net/yunyun1886358/article/details/135433789
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。