Spring Cloud Stream是一个用来为微服务应用构建 消息驱动 能力的框架。通过使用 Spring Cloud Strea m ,可以有效简化开发人员对消息中间件的使用复杂度,降低代码与消息中间件间的耦合度,屏蔽消息中间件 之 间的差异性,让开发人员可以有更多的精力关注于核心业务逻辑的处理。
主要有以下几个组件:
1)、目的地绑定器(Destination Binders):负责提供与外部消息系统集成的组件。
2)、固定器(Bindings):介于外部消息系统与应用程序间的桥梁 ,这个应用程序提供了生产者和消费者的消息 (由 Destination Binders 创建)。
3)、输入管道(Input Bindings):消费者通过Input Bindings 连接 Binder ,而 Binder 与 MQ 连接,即消费者通过 Input Bindings 从 MQ 读取数据。
4)、输出管道(Output Bindings):生产者通过Output Bindings 连接 Binder ,而 Binder 与 MQ 连接,即生产者通过 Output Bindings 向 MQ 写入数据。
5)、消息(Message):生产者和消费者使用的规范数据结构,用于与 Binders 通信(从而通过外部消息系统与其他应用程序通信)。
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
配置文件
server:
port: 8090
spring:
cloud:
stream:
kafka:
binder:
brokers: 192.168.30.88:9092,192.168.30.89:9092
bindings:
producer-out-0:
destination: topic1
content-type: application/json
代码实现
@Autowired
private StreamBridge streamBridge;
@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){
Map<String , Object> map = new HashMap<>();
map.put("tag", "tags");
MessageHeaders headers = new MessageHeaders(map);
// 封装消息
Message<String> message = MessageBuilder.createMessage(msg, headers);
//发送消息
streamBridge.send("producer-out-0", message);
return msg;
}
配置文件
server:
port: 8091
spring:
cloud:
stream:
kafka:
binder:
brokers: 192.168.30.88:9092,192.168.30.89:9092
function:
definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样
bindings:
consumer-in-0:
destination: topic1
content-type: application/json
代码实现
// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){
return msg -> {
System.out.println("接收到消息:" + msg.getPayload());
};
}
引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
代码实现
@Autowired
private StreamBridge streamBridge;
@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){
Map<String , Object> map = new HashMap<>();
map.put(MessageConst.PROPERTY_TAGS, "tags");
MessageHeaders headers = new MessageHeaders(map);
// 封装消息
Message<String> message = MessageBuilder.createMessage(msg, headers);
//发送消息
streamBridge.send("producer-out-0", message);
return JSON.toJSONString(message);
}
配置文件:
server:
port: 8091
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.30.88:9876
function:
definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样
bindings:
consumer-in-0:
destination: topic1
content-type: application/json
代码实现:
// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){
return msg -> {
System.out.println("接收到消息:" + msg.getPayload());
};
}
注:
1、在spring-cloud-stream 3.1.0之前的版本,还有采用定义Source、Sink等方式编写消息生产者和消费者,在3.1.0以后的版本中弃用@StreamListener的方式,而采用函数式编程的方式接入,使用StreamBrige来进行发送。
2、注意binding的名称命名规则
例如:上面的代码中定义的consumer。
# 输入: <方法名> + -in- + <index>
# 输出: <方法名> + -out- + <index>
总结:本文介绍Stream统一消息中间件的模型,给出基于kafka和Rocketmq两种消息中间件模型下的使用案例,以及给出废弃使用老版本的Source、Sink模式解释。帮助大家快速上手Stream的使用。
???????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:上了年纪的小男孩。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)