Spring Cloud Stream 是一个构建消息驱动微服务的框架,Spring Cloud Stream 提供了一个抽象层,屏蔽了不同消息中间件之间的差异,使得开发人员可以不再关注具体的消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务。
程序模型
Spring Cloud Stream的核心与中间件实现无关。Stream应用通过输入输出通道(channel)来与外界交互。通道(channel)通过与外部中间件对应的绑定器(Binder)具体实现,来与外部的中间件产品进行通信。
Spring Cloud Stream提供了对 Kafka, RabbitMQ等中间件的绑定实现。
Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。
在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。
Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息。在应用程序中,可以使用@StreamListener注解将方法标记为Input通道的监听器,并在方法参数中指定接收到的消息类型。
Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息。在应用程序中,可以使用@Output注解定义一个Output通道,然后在需要发送消息的方法上使用MessageChannel或OutputStream参数来将消息发送到Output通道。
Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。
Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。
消费者组:在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。
注:对于一个消息来说,每一个消费者组只会有一个消费者消费消息
分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理
1、添加依赖
根据我们使用的中间件来选择我们的依赖,因为我使用的是kafka,所以使用的是spring-cloud-stream-binder-kafka依赖,该依赖会帮我们引入Spring Cloud Stream 和kafka的相关依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${binder.version}</version>
</dependency>
如果你使用的是RabbitMQ,那么请使用以下依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${binder.version}</version>
</dependency>
2、配置文件配置
下面是kafka的简单配置,关于配置的详细介绍,我会在下一篇文章进行介绍
spring:
cloud:
stream:
kafka:
binder:
brokers: <kafka_broker_address> # Kafka broker地址
bindings:
myInput:
destination: <input_topic> # 输入通道对应的Kafka主题名称
myOutput:
destination: <output_topic> # 输出通道对应的Kafka主题名称
3、创建绑定接口类,定义输入和输出通道
/**
* 定义输出和输入通道
*/
public interface MyProcessor {
String INPUT = "myInputChannel";
String OUTPUT = "myOutputChannel";
/**
*监听一个通道,通道名为TNPUT的值
*/
@Input(INPUT)
SubscribableChannel myInputChannel();
/**
* 发送消息到输出通道,通道门为OUTPUT的值
* @return
*/
@Output(OUTPUT)
MessageChannel myOutputChannel();
}
在上述示例中,MyProcessor是一个绑定接口,定义了一个名为myInputChannel的输入通道和一个名为myOutputChannel的输出通道。通过@Input和@Output注解来标识通道的名称。
输入和输出通道的定义我们可以定义在一个接口,也可以输入通道一个接口,输出通道一个接口
4、创建绑定接类
通过@EnableBinding注解将绑定接口绑定到应用程序的逻辑处理器
使用@EnableBinding时,需要指定一个或多个接口类作为参数,这些接口类包含表示可绑定组件(通常是消息通道)的方法。Spring Cloud Stream会自动扫描这些接口类,并根据配置创建相应的消息代理中间件和应用程序之间的连接。
然后我们可以使用@StreamListener注解,指定一个方法作为消息的监听器,当消息到达时会自动调用该方法进行处理。
/**
* 指定接口来绑定消息通道,多个接口使用逗号分隔
*/
@EnableBinding(MyProcessor.class)
public class MyProcessorHandler {
/**
* 处理输入通道的消息
* @param message
*/
@StreamListener(INPUT)
public void handleInputMessage(String message) {
System.out.println(message);
}
/**
* 处理输出通道的消息
* @param message
*/
@StreamListener(OUTPUT)
public void handleOutputMessage(String message) {
System.out.println(message);
}
}
在上述示例中,MyProcessor是一个绑定接口,INPUT和OUTPUT分别是输入和输出通道的名称。handleInputMessage和handleOutputMessage方法用于处理从输入通道和输出通道接收到的消息。
当使用了@EnableBinding注解,Spring Cloud Stream会自动创建与绑定接口中定义的通道相关的Bean,并将其添加到应用程序的上下文中。这些Bean就可以通过自动装配(@Autowired)来进行访问和使用了,下面我们就使用通道来发送消息
5、发送消息
@Component
public class SendUtil {
@Autowired
private MyProcessor myProcessor;
/**
* 发送消息
* @param msg
*/
public void sendMsg(String msg){
myProcessor.myOutputChannel().send(MessageBuilder.withPayload(msg).build());
}
}
上述例子是通过myOutputChannel通道来发送消息,因为@EnableBinding注解帮我们生成了MyProcessor 的bean,所以我们可以直接注入使用