【Spring连载】使用Spring访问 Apache Kafka(四)----监听器容器属性

发布时间:2024年01月16日

表1.ContainerProperties 属性

属性默认值描述
ackCount1当ackMode为COUNT或COUNT_TIME时,提交待处理偏移量之前的记录数
adviceChainnull包裹消息监听器的Advice对象链(例如,MethodInterceptor around advice),按顺序调用。
ackModeBATCH控制提交偏移的频率
ackTime5000当ackMode为TIME 或者COUNT_TIME时,在多少毫秒之后提交待处理的偏移量
assignmentCommitOptionLATEST_ONLY _NO_TX是否在分配分区的时候提交初始位置;默认情况下,初始偏移量只在ConsumerConfig.AUTO_OFFSET_RESET_CONFIG是latest的时候提交,并且即使存在事务管理器,它也不会在事务中运行
asyncAcksfalse开启无序提交;consumer被暂停,并且提交被推迟,直到收到缺少的acknowledgments 为止
authExceptionRetryIntervalnull当不为null时,当Kafka客户端抛出AuthenticationException或AuthorizationException时,poll()之间的睡眠持续时间。当为null时,此类异常被认为是严重的,容器将停止
clientId(空字符串)覆盖consumer工厂的client.id属性;在并发的容器里,会为每一个consumer实例加一个-n的后缀
checkDeserExWhenKeyNullfalse设置为true可在收到空key时始终检查DeserializationException header。当consumer代码无法确定已配置ErrorHandlingDeserializer时(例如使用委托deserializer时),这一点很有用
checkDeserExWhenValueNullfalse设置为true可在收到空value时始终检查DeserializationException header。当consumer代码无法确定已配置ErrorHandlingDeserializer时(例如使用委托deserializer时),这一点很有用
commitCallbacknull当存在且syncCommits为false时,将在提交完成后调用回调
offsetAndMetadataProvidernullOffsetAndMetadata的提供者;默认情况下,提供者创建一个偏移量和空的元数据。提供者给了一种自定义元数据的方法
commitLogLevelDEBUG与提交偏移量有关的日志级别
consumerRebalanceListenernull再平衡监听器
consumerStartTimout30s在日志记录错误之前等待consumer启动的时间;这可能发生,比如你使用了一个没有足够线程的task executor
consumerTaskExecutorSimpleAsyncTaskExecutor一个执行consumer线程的task executor。默认的executor创建的线程以-C-n命名。对于KafkaMessageListenerContainer,name就是bean名称;对于ConcurrentMessageListenerContainer,名称是以-n作为后缀的bean名称,其中每个子容器的n递增
deliveryAttemptHeaderfalse见处理异常中的Delivery Attempts Header
eosModeV2精确一次语义模式Exactly Once Semantics mode
fixTxOffsetsfalse当消费事务生产者生成的记录,并且consumer位于分区的末尾时,由于用于指示事务提交/回滚的伪记录,以及可能存在的回滚记录,lag 可能会被错误地报告为大于零。这在功能上不会影响consumer,但一些用户表示担心“lag”是非零的。将此属性设置为true,容器将更正此类误报的偏移量。该检查在下一次poll之前执行,以避免给提交处理增加显著的复杂性。只有当consumer配置为“isolation.level=read_committed”并且max.poll.records大于1时,才会更正lag
groupIdnull覆盖consumer的group.id属性;由@KafkaListener的id 或者 groupId自动设置
idleBeforeDataMultiplier5.0idleEventInterval的系数,在接收任何记录之前应用。在接收到记录之后,不再应用系数
idleBetweenPolls0用于通过在polls之间休眠线程来减慢传递速度。处理一个batch记录的时间加上此值必须小于max.poll.interval.ms的consumer 属性
idleEventIntervalnull设置后,启用ListenerContainerIdleEvent的发布,请参阅应用程序事件(Application Events)和检测空闲和无响应消费者(Detecting Idle and Non-Responsive Consumers)。另请参阅idleBeforeDataMultiplier
idlePartitionEventIntervalnull设置后,启用ListenerContainerIdlePartitionEvent的发布,请参阅应用程序事件(Application Events)和检测空闲和无响应消费者(Detecting Idle and Non-Responsive Consumers)
kafkaConsumerPropertiesNone用于覆盖在consumer工厂上配置的任意consumer属性
logContainerConfigfalse设置为true以在INFO级别记录所有容器属性
messageListenernull消息监听器
micrometerEnabledtrue是否为consumer线程维护Micrometer计时器
micrometerTagsempty要添加到Micrometer metrics中的静态标记的map
micrometerTagsProvidernull一种基于consumer记录提供动态标记的函数
missingTopicsFatalfalse当为true时,如果broker上不存在已配置的主题,则阻止容器启动
monitorInterval30s为NonResponsiveConsumerEvent事件检查consumer线程状态的频率。请参阅noPollThreshold和pollTimeout
noPollThreshold3.0乘以pollTimeOut以确定是否发布NonResponsiveConsumerEvent。请参阅monitorInterval
onlyLogRecordMetadatafalse设置为false以记录完整的consumer记录(错误、调试日志等),而不是仅记录topic-partition@offset
pauseImmediatefalse当容器暂停时,在当前记录之后停止处理,而不是在处理上一次poll的所有记录之后;剩余的记录保留在内存中,并将在容器恢复时传递给监听器
pollTimeout5000传入Consumer.poll()方法的超时时间(以毫秒为单位)
pollTimeoutWhilePaused100容器处于暂停状态时传递给Consumer.poll()方法的超时时间(以毫秒为单位)
restartAfterAuthExceptionsfalse如果容器因“authorization/authentication”异常而停止,为True可以重新启动容器
schedulerThreadPoolTaskScheduler运行consumer监视任务的调度程序
shutdownTimeout10000在所有consumers停止之前并且在发布容器停止事件之前,阻塞stop()方法的最大时间(以毫秒为单位)
stopContainerWhenFencedfalse如果抛出ProducerFencedException,则停止监听器容器
stopImmediatefalse当容器停止时,在处理完当前记录之后停止处理,而不是在处理完前一次poll的所有记录之后
subBatchPerPartitionfalse在使用batch监听器时,如果为true,则调用该监听器,并将poll的结果分成sub batch,每个分区一个
syncCommitTimeoutnullsyncCommits为true时使用的超时。如果没有设置,容器将尝试确定default.api.timeout.ms的consumer 属性并使用它;否则它将使用60秒
syncCommitstrue对于偏移量,是使用同步提交还是异步提交;请参阅commitCallback
topics topicPattern topicPartitionsn/a配置的topics、topic pattern或显式分配的topics/partitions。相互排斥;必须提供至少一个;由ContainerProperties构造函数强制执行
transactionManagernull请参阅事务Transactions

表 2. AbstractListenerContainer 属性

属性默认值描述
afterRollbackProcessorDefaultAfterRollbackProcessor事务回滚后调用的AfterRollbackProcessor
applicationEventPublisherapplication context事件发布者
batchInterceptornull在调用batch监听器之前,设置要调用的BatchInterceptor;不适用于record监听器。另请参阅interceptBeforeTx
beanNamebean name容器的bean名称;子容器的后缀为-n
commonErrorHandlerDefaultErrorHandler 或者 null当提供了transactionManager并使用了DefaultAfterRollbackProcessor,默认为DefaultErrorHandler 或者 null。请参阅容器错误处理程序Container Error Handlers
containerPropertiesContainerProperties容器属性实例
groupId见描述containerProperties.groupId(如果存在),否则为consumer工厂的group.id属性
interceptBeforeTxtrue确定是在事务开始之前还是之后调用recordInterceptor
listenerId见描述用户配置的容器bean名称或@KafkaListener的id属性
listenerInfonull要填充在“KafkaHeaders.LISTENER_INFO” header中的值。对于@KafkaListener,这个值是从info属性中获得的。这个header可以在各种地方使用,例如RecordInterceptor、RecordFilterStrategy和listener代码本身
pauseRequested(只读)如果请求了consumer暂停,则为True
recordInterceptornull设置一个RecordInterceptor在调用record监听器之前调用;不适用于batch监听器。参见interceptBeforeTx
topicCheckTimeout30s当missingTopicsFatal容器属性为true时,为describeTopics操作完成等待多久(以秒计)

表3. KafkaMessageListenerContainer 属性

属性默认值描述
assignedPartitions(只读)当前分配给该容器的分区(显式或非显式)
assignedPartitionsByClientId(只读)当前分配给该容器的分区(显式或非显式)
clientIdSuffixnull由并发容器使用,为每个子容器的consumer提供唯一的client.id
containerPausedn/a如果已经请求暂停并且consumer已经实际暂停,则为True

表4. ConcurrentMessageListenerContainer 属性

属性默认值描述
alwaysClientIdSuffixtrue当并发性仅为1时,设置为false可禁止向consumer的“client.id”属性添加后缀
assignedPartitions(只读)当前分配给该容器的子容器KafkaMessageListenerContainer的分区总数(显式或非显式)
assignedPartitionsByClientId(只读)当前分配给该容器的子容器KafkaMessageListenerContainer的分区(显式或非显式),由子容器的consumer的 client.id属性设置key
concurrency1要管理的子KafkaMessageListenerContainer的数量
containerPausedn/a如果已请求暂停并且所有子容器的consumer都已实际暂停,则为True
containersn/a对所有子KafkaMessageListenerContainer的引用
文章来源:https://blog.csdn.net/gabriel_wang_sh/article/details/135588439
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。