同步副本需满足的条件:
broker级别配置:default.replication.factor=1
topic级别配置:replication.factor=1
建议非关键数据小于3
建议把broker分布在多个不同的机器上
unclean.leader.election.enable=false
指示是否启用非同步副本可以被选为首领,作为首领选举的最后手段,即使这样做可能会导致数据丢失
min.insync.replicas=1
最小同步副本数。min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数, 当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的 Follower 赶上并重新进入ISR, 因此能够容忍min.insync.replicas-1个副本同时宕机。当与min.insync.replicas和acks一起使用时,可以实现更大的耐用性保证。一个典型的场景是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用acks “all”进行生产。
replica.lag.time.max.ms=30000 (30 seconds)
如果一个follower这段时间内没有发送任何fetch请求,或者没有消费leader最新偏移量的消息,那么leader将从isr中删除该follower。
zookeeper.session.timeout.ms=18000 (18 seconds)
允许broker不向ZooKeeper发送心跳的时间间隔。如果超过这个时间不向ZK发送心跳,ZK会认为broker已经死亡,会将其移除出集群。
Kafka会在重启之前和关闭日志片段的时候将消息冲刷到磁盘上,或者等Linux系统页面缓存被填满时冲刷。拥有不同机架上的副本的多个磁盘比只写入首领磁盘更加安全。不过,也可以让broker更频繁的写入磁盘。
flush.messages=9223372036854775807
此设置允许指定一个间隔,在该间隔,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1,我们将在每条消息之后进行fsync;如果是5,我们将在每5条消息之后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。此设置可以在每个主题的基础上覆盖(请参阅每个主题配置部分)。
flush.ms=9223372036854775807
此设置允许指定一个时间间隔,在该时间间隔内,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1000,我们将在1000毫秒后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。
acks参数指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。允许以下设置:
acks=0。如果设置为零,则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,无法保证服务器已收到记录,重试配置也不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量将始终设置为-1。
acks=1。表示只要首领收到消息,并将记录成功写入其本地日志,就返回成功响应,不等待所有追随者的确认。在这种情况下,如果首领在确认成功后,追随者复制之前崩溃,则记录将会丢失。
acks=all。表示首领将等待同步复制集合中所有的副本都确认收到了记录。这保证了只要至少有一个同步复制副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于acks=-1的设置。
请注意,启用幂等性要求此配置值为“all”。如果设置了冲突的配置并且没有显式启用幂等性,则会禁用幂等性。
设置自动重试,并使用默认重试次数。
将delivery.timeout.ms设置成愿意等待的时长,生产者会在这段时间内重试。
例如:
group.id
auto.offset.reset
如果所有的处理逻辑都是在轮询里进行的,并且不需要维护轮询之间的状态(比如为了聚合数据),那么就很简单,可以使用自动提交,在轮询结束时提交偏移量。
enable.auto.commit
无法控制应用应用程序可能重复处理的消息的数量
如果应用程序把消息交给另一个后台线程处理,那么只能使用手动提交了
auto.commit.interval.ms
自动提交的频率过大,会增加重复的概率;过小,会增加额外开销
手动提交偏移量增加了灵活性,但也增加了复杂度并且有可能对性能产生影响,所以可能需要考虑如下事项:
如果所有的处理逻辑都是在轮询里进行的,就很简单,选择一个合适的提交频率;
如果涉及额外线程,该如何呢?
一定要在处理完后在提交偏移量
如何在分区被撤销之前提交偏移量;
如何在应用程序被分配到新分区并清理状态时提交偏移量
如果遇到批次中的部分消息需要稍后处理。因为消费者不能针对每一条消息提交偏移量,而是提交最后一条成功的偏移量,所以需要借助额外的工具来处理。
有两种模式来解决这个问题:
1.在提交偏移量的同时,状态存入另一个主题中,可以开启事务来保证一致性。当一个线程重新启动时,就可以读取状态和从偏移量处读取消息。
2. 使用流式处理框架
测试场景:
测试场景:
生产者:
消费者:
broker:
To be continued …