SpringCloud Stream的配置以spring.cloud.stream为前缀,主要分为三部分
spring.cloud.stream.default-binder=rabbit
通用配置
适用于输入和输出通道,通过spring.cloud.stream.bindings..前缀配置,源码类BindingProperties中存有所有可配置属性。
下面是BindingProperties的属性介绍
destination:设置输入或输出通道的目标,如rabbitmq中为exchange名,kafka为topic名
例如,对于名为 myInput 的输入通道:
spring.cloud.stream.bindings.myInput.destination=myTopic
contentType:设置输入或输出通道的消息内存类型名,默认为application/json
group:设置消费者分组
binder:存在多个绑定器时,指定当前通道使用的绑定器
消费者配置
spring.cloud.stream.bindings..consumer为前缀,用于定义消费者的属性,只对输入通道有效,源码类为
ConsumerProperties
concurrency:设置消费者的并发数,默认为1,可以根据自己的需求进行修改,例如:
spring.cloud.stream.bindings.myInput.consumer.concurrency=5
partitioned:消费者是否从分区的生产者接收数据,默认为false
instanceCount 和 instanceIndex:用于在多个实例之间进行分区和负载均衡。instanceCount 指定实例的总数,instanceIndex 指定当前实例的索引,两个属性默认都为-1
spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=0
maxAttempts:设置消息消费失败时的最大重试次数,默认为3,可以根据自己的需求进行修改,例如:
spring.cloud.stream.bindings.myInput.consumer.max-attempts=5
backOffInitialInterval:重试消息处理的初始间隔时间,默认值为1000
backOffMaxInterval:重试消息处理的最大间隔时间,默认值为10000
backOffMultiplier:重试消息处理时间间隔的递增乘数,默认值为2.0
生产者属性
spring.cloud.stream.bindings..producer为前缀,用于定义生产者的属性,只对输出通道有效,配置源码类为ProducerProperties
spring.cloud.stream.bindings.myOutput.producer.partition-key-expression="'key'"
这将使用 ‘key’ 作为分区键。
spring.cloud.stream.bindings.myOutput.producer.partition-count=3
这将设置要发送到 3 个分区。
spring.cloud.stream.bindings.myOutput.producer.required-groups=myGroup1,myGroup2
这设置只有 myGroup1`和 myGroup2的消费者组接收来自该生产者的消息。
none
、embeddedHeaders
或 headers
。例如:spring.cloud.stream.bindings.myOutput.producer.header-mode=embeddedHeaders
headerMode 有以下几种取值:
none:不处理消息头部。此模式下,消息的头部将被忽略,只处理消息体。
embeddedHeaders:将消息头部嵌入到消息体中。在发送消息时,消息头部的键值对会被序列化并放置在消息体中的特定位置。在接收消息时,框架会从消息体中解析出消息头部的内容。
headers:在这种模式下,所有的原生消息头都会被直接映射为Spring Messaging MessageHeaders的一部分。
前缀:spring.cloud.stream.binders,源码类为BinderProperties,用于定义和配置消息中间件绑定器(Binder),这样应用可以同时与多个消息代理系统(如Kafka、RabbitMQ等)进行交互。每个binder都有一个名称,通过这个名称可以在其他地方引用它,并且需要指定其类型以及相关的环境属性。
1.type:指定 Binder 的类型,是一个字符串,例如 kafka或者rabbit
2.environment:指定 Binder 的环境属性,例如 kafka的主机地址和端口号,我们可以在environment中定义各个binder的属性,如kafka,我们可以定义spring.kafka的属性,
示例:
spring:
cloud:
stream:
binders:
myKafkaBinder: #Binder的名称,由我们定义
type: kafka # Binder的类型是 kafka
environment:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
listener:
ack-mode: MANUAL_IMMEDIATE
该配置指定了一个名为 kafkaBinder1 的 Binder,其类型为 kafka,并使用了一些 Kafka 客户端属性来自定义 Kafka 生产者和消费者的行为。这样的方式适合我们有多个binder时进行配置,如果只有一个,那么直接使用spring.cloud.stream.kafka.binder这种方式进行配置即可,也可以直接使用spring.kafka进行配置,因为spring.cloud.stream.kafka.binder的配置会和spring.kafka的配置进行合并
public Map<String, Object> mergedConsumerConfiguration() {
Map<String, Object> consumerConfiguration = new HashMap<>();
consumerConfiguration.putAll(this.kafkaProperties.buildConsumerProperties());
// Copy configured binder properties that apply to consumers
for (Map.Entry<String, String> configurationEntry : this.configuration
.entrySet()) {
if (ConsumerConfig.configNames().contains(configurationEntry.getKey())) {
consumerConfiguration.put(configurationEntry.getKey(),
configurationEntry.getValue());
}
}
consumerConfiguration.putAll(this.consumerProperties);
filterStreamManagedConfiguration(consumerConfiguration);
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
return getConfigurationWithBootstrapServer(consumerConfiguration,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
以下是这些属性的含义:
spring.kafka.bootstrap-servers:指定 Kafka 的 Bootstrap 服务器地址。
spring.kafka.producer.retries:生产者发送失败后的重试次数。
spring.kafka.producer.batch-size:批量发送消息时的默认批次大小。
spring.kafka.producer.buffer-memory:生产者可用于缓冲等待发送到服务器的记录的总内存大小。
spring.kafka.producer.acks:指定生产者要求 leader 接收的确认数。
确认数表示在将消息视为成功发送之前,生产者需要等待的确认数量。这个属性可以设置为以下几个值:
0:生产者不会等待来自 broker 的任何确认,将立即认为消息发送成功。
1:生产者在收到来自 leader 分区的确认后,即可认为消息发送成功。
-1 或 all:生产者在收到来自 leader 和所有副本分区的确认后,才认为消息发送成功。
默认情况下,acks 的值是 1,即只要 leader 分区确认接收到消息,生产者就认为发送成功。这种配置提供了一定的容错性,因为只需要一个副本确认即可,但也可能导致消息丢失的风险。
spring.kafka.consumer.enable-auto-commit:是否启用自动提交偏移量。
spring.kafka.consumer.auto-offset-reset:当消费者没有初始偏移量或者当前偏移量不存在时,从何处开始消费。
该属性可以设置为以下几个值:
latest:当发现当前分区不存在或者当前偏移量无效时,将从最新的偏移量开始消费。也就是说,消费者会跳过已经提交的偏移量,从最新的消息开始消费。
earliest:当发现当前分区不存在或者当前偏移量无效时,将从最早的偏移量开始消费。也就是说,消费者会从最早可用的消息开始消费。
none:当发现当前分区不存在或者当前偏移量无效时,抛出异常。
spring.kafka.listener.ack-mode:当消费者从主题中读取消息时,应该何时确认消息的接收。MANUAL_IMMEDIATE为手动确认模式,消费者需要在消息处理逻辑完成后,调用 Acknowledgment.acknowledge() 方法来确认消息的接收
绑定器配置
因为我使用的是kafka,所以下面介绍Spring Cloud Stream 框架中与 Kafka 相关的配置属性,前缀为:spring.cloud.stream.kafka,这些配置会和spring.kafka配置进行合并
spring.cloud.stream.kafka.binder.brokers:指定 Kafka 服务的地址列表,多个地址使用逗号分隔。
spring.cloud.stream.kafka.binder.defaultBrokerPort:指定 Kafka 服务的默认端口号。
spring.cloud.stream.kafka.binder.autoCreateTopics:指定是否自动创建主题。如果设置为 true,则在使用不存在的主题时会自动创建主题;如果设置为 false,则必须手动创建主题后才能使用。
spring.cloud.stream.kafka.binder.zkNodes:指定 ZooKeeper 服务的地址列表,多个地址使用逗号分隔。在使用基于 ZooKeeper 的 Kafka 集群时需要配置该属性。
spring.cloud.stream.kafka.binder.consumerProperties:用于设置 Kafka 消费者属性的配置项。该属性可以被用来设置一些消费者相关的属性,例如消费者的 offset 起始值、心跳时间、session 超时时间等。
例如:
spring.cloud.stream.kafka.binder.consumerProperties.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.consumerProperties.session.timeout.ms=6000
auto.offset.reset=earliest 表示当消费者第一次订阅一个主题时,从最早的消息开始消费;而 session.timeout.ms=6000 表示消费者和 Kafka broker 之间的 session 超时时间为 6 秒。
spring.cloud.stream.kafka.binder.producerProperties:用于设置 Kafka 生产者属性的配置项。通过这个属性,您可以设置一些与生产者相关的属性,如acks、retries、compression.type 等。
例如:
spring.cloud.stream.kafka.binder.producerProperties.acks=all
spring.cloud.stream.kafka.binder.producerProperties.retries=3
spring.cloud.stream.kafka.binder.configuration.*:指定 Kafka 客户端的其他配置属性,可以根据实际业务需求进行配置。