表1.ContainerProperties 属性
属性 | 默认值 | 描述 |
---|---|---|
ackCount | 1 | 当ackMode为COUNT或COUNT_TIME时,提交待处理偏移量之前的记录数 |
adviceChain | null | 包裹消息监听器的Advice对象链(例如,MethodInterceptor around advice),按顺序调用。 |
ackMode | BATCH | 控制提交偏移的频率 |
ackTime | 5000 | 当ackMode为TIME 或者COUNT_TIME时,在多少毫秒之后提交待处理的偏移量 |
assignmentCommitOption | LATEST_ONLY _NO_TX | 是否在分配分区的时候提交初始位置;默认情况下,初始偏移量只在ConsumerConfig.AUTO_OFFSET_RESET_CONFIG是latest的时候提交,并且即使存在事务管理器,它也不会在事务中运行 |
asyncAcks | false | 开启无序提交;consumer被暂停,并且提交被推迟,直到收到缺少的acknowledgments 为止 |
authExceptionRetryInterval | null | 当不为null时,当Kafka客户端抛出AuthenticationException或AuthorizationException时,poll()之间的睡眠持续时间。当为null时,此类异常被认为是严重的,容器将停止 |
clientId | (空字符串) | 覆盖consumer工厂的client.id属性;在并发的容器里,会为每一个consumer实例加一个-n的后缀 |
checkDeserExWhenKeyNull | false | 设置为true可在收到空key时始终检查DeserializationException header。当consumer代码无法确定已配置ErrorHandlingDeserializer时(例如使用委托deserializer时),这一点很有用 |
checkDeserExWhenValueNull | false | 设置为true可在收到空value时始终检查DeserializationException header。当consumer代码无法确定已配置ErrorHandlingDeserializer时(例如使用委托deserializer时),这一点很有用 |
commitCallback | null | 当存在且syncCommits为false时,将在提交完成后调用回调 |
offsetAndMetadataProvider | null | OffsetAndMetadata的提供者;默认情况下,提供者创建一个偏移量和空的元数据。提供者给了一种自定义元数据的方法 |
commitLogLevel | DEBUG | 与提交偏移量有关的日志级别 |
consumerRebalanceListener | null | 再平衡监听器 |
consumerStartTimout | 30s | 在日志记录错误之前等待consumer启动的时间;这可能发生,比如你使用了一个没有足够线程的task executor |
consumerTaskExecutor | SimpleAsyncTaskExecutor | 一个执行consumer线程的task executor。默认的executor创建的线程以-C-n命名。对于KafkaMessageListenerContainer,name就是bean名称;对于ConcurrentMessageListenerContainer,名称是以-n作为后缀的bean名称,其中每个子容器的n递增 |
deliveryAttemptHeader | false | 见处理异常中的Delivery Attempts Header |
eosMode | V2 | 精确一次语义模式Exactly Once Semantics mode |
fixTxOffsets | false | 当消费事务生产者生成的记录,并且consumer位于分区的末尾时,由于用于指示事务提交/回滚的伪记录,以及可能存在的回滚记录,lag 可能会被错误地报告为大于零。这在功能上不会影响consumer,但一些用户表示担心“lag”是非零的。将此属性设置为true,容器将更正此类误报的偏移量。该检查在下一次poll之前执行,以避免给提交处理增加显著的复杂性。只有当consumer配置为“isolation.level=read_committed”并且max.poll.records大于1时,才会更正lag |
groupId | null | 覆盖consumer的group.id属性;由@KafkaListener的id 或者 groupId自动设置 |
idleBeforeDataMultiplier | 5.0 | idleEventInterval的系数,在接收任何记录之前应用。在接收到记录之后,不再应用系数 |
idleBetweenPolls | 0 | 用于通过在polls之间休眠线程来减慢传递速度。处理一个batch记录的时间加上此值必须小于max.poll.interval.ms的consumer 属性 |
idleEventInterval | null | 设置后,启用ListenerContainerIdleEvent的发布,请参阅应用程序事件(Application Events)和检测空闲和无响应消费者(Detecting Idle and Non-Responsive Consumers)。另请参阅idleBeforeDataMultiplier |
idlePartitionEventInterval | null | 设置后,启用ListenerContainerIdlePartitionEvent的发布,请参阅应用程序事件(Application Events)和检测空闲和无响应消费者(Detecting Idle and Non-Responsive Consumers) |
kafkaConsumerProperties | None | 用于覆盖在consumer工厂上配置的任意consumer属性 |
logContainerConfig | false | 设置为true以在INFO级别记录所有容器属性 |
messageListener | null | 消息监听器 |
micrometerEnabled | true | 是否为consumer线程维护Micrometer计时器 |
micrometerTags | empty | 要添加到Micrometer metrics中的静态标记的map |
micrometerTagsProvider | null | 一种基于consumer记录提供动态标记的函数 |
missingTopicsFatal | false | 当为true时,如果broker上不存在已配置的主题,则阻止容器启动 |
monitorInterval | 30s | 为NonResponsiveConsumerEvent事件检查consumer线程状态的频率。请参阅noPollThreshold和pollTimeout |
noPollThreshold | 3.0 | 乘以pollTimeOut以确定是否发布NonResponsiveConsumerEvent。请参阅monitorInterval |
onlyLogRecordMetadata | false | 设置为false以记录完整的consumer记录(错误、调试日志等),而不是仅记录topic-partition@offset |
pauseImmediate | false | 当容器暂停时,在当前记录之后停止处理,而不是在处理上一次poll的所有记录之后;剩余的记录保留在内存中,并将在容器恢复时传递给监听器 |
pollTimeout | 5000 | 传入Consumer.poll()方法的超时时间(以毫秒为单位) |
pollTimeoutWhilePaused | 100 | 容器处于暂停状态时传递给Consumer.poll()方法的超时时间(以毫秒为单位) |
restartAfterAuthExceptions | false | 如果容器因“authorization/authentication”异常而停止,为True可以重新启动容器 |
scheduler | ThreadPoolTaskScheduler | 运行consumer监视任务的调度程序 |
shutdownTimeout | 10000 | 在所有consumers停止之前并且在发布容器停止事件之前,阻塞stop()方法的最大时间(以毫秒为单位) |
stopContainerWhenFenced | false | 如果抛出ProducerFencedException,则停止监听器容器 |
stopImmediate | false | 当容器停止时,在处理完当前记录之后停止处理,而不是在处理完前一次poll的所有记录之后 |
subBatchPerPartition | false | 在使用batch监听器时,如果为true,则调用该监听器,并将poll的结果分成sub batch,每个分区一个 |
syncCommitTimeout | null | syncCommits为true时使用的超时。如果没有设置,容器将尝试确定default.api.timeout.ms的consumer 属性并使用它;否则它将使用60秒 |
syncCommits | true | 对于偏移量,是使用同步提交还是异步提交;请参阅commitCallback |
topics topicPattern topicPartitions | n/a | 配置的topics、topic pattern或显式分配的topics/partitions。相互排斥;必须提供至少一个;由ContainerProperties构造函数强制执行 |
transactionManager | null | 请参阅事务Transactions |
表 2. AbstractListenerContainer 属性
属性 | 默认值 | 描述 |
---|---|---|
afterRollbackProcessor | DefaultAfterRollbackProcessor | 事务回滚后调用的AfterRollbackProcessor |
applicationEventPublisher | application context | 事件发布者 |
batchInterceptor | null | 在调用batch监听器之前,设置要调用的BatchInterceptor;不适用于record监听器。另请参阅interceptBeforeTx |
beanName | bean name | 容器的bean名称;子容器的后缀为-n |
commonErrorHandler | DefaultErrorHandler 或者 null | 当提供了transactionManager并使用了DefaultAfterRollbackProcessor,默认为DefaultErrorHandler 或者 null。请参阅容器错误处理程序Container Error Handlers |
containerProperties | ContainerProperties | 容器属性实例 |
groupId | 见描述 | containerProperties.groupId(如果存在),否则为consumer工厂的group.id属性 |
interceptBeforeTx | true | 确定是在事务开始之前还是之后调用recordInterceptor |
listenerId | 见描述 | 用户配置的容器bean名称或@KafkaListener的id属性 |
listenerInfo | null | 要填充在“KafkaHeaders.LISTENER_INFO” header中的值。对于@KafkaListener,这个值是从info属性中获得的。这个header可以在各种地方使用,例如RecordInterceptor、RecordFilterStrategy和listener代码本身 |
pauseRequested | (只读) | 如果请求了consumer暂停,则为True |
recordInterceptor | null | 设置一个RecordInterceptor在调用record监听器之前调用;不适用于batch监听器。参见interceptBeforeTx |
topicCheckTimeout | 30s | 当missingTopicsFatal容器属性为true时,为describeTopics操作完成等待多久(以秒计) |
表3. KafkaMessageListenerContainer 属性
属性 | 默认值 | 描述 |
---|---|---|
assignedPartitions | (只读) | 当前分配给该容器的分区(显式或非显式) |
assignedPartitionsByClientId | (只读) | 当前分配给该容器的分区(显式或非显式) |
clientIdSuffix | null | 由并发容器使用,为每个子容器的consumer提供唯一的client.id |
containerPaused | n/a | 如果已经请求暂停并且consumer已经实际暂停,则为True |
表4. ConcurrentMessageListenerContainer 属性
属性 | 默认值 | 描述 |
---|---|---|
alwaysClientIdSuffix | true | 当并发性仅为1时,设置为false可禁止向consumer的“client.id”属性添加后缀 |
assignedPartitions | (只读) | 当前分配给该容器的子容器KafkaMessageListenerContainer的分区总数(显式或非显式) |
assignedPartitionsByClientId | (只读) | 当前分配给该容器的子容器KafkaMessageListenerContainer的分区(显式或非显式),由子容器的consumer的 client.id属性设置key |
concurrency | 1 | 要管理的子KafkaMessageListenerContainer的数量 |
containerPaused | n/a | 如果已请求暂停并且所有子容器的consumer都已实际暂停,则为True |
containers | n/a | 对所有子KafkaMessageListenerContainer的引用 |