springCloud之Stream

发布时间:2024年01月03日
1、简介

Spring Cloud Stream是一个用来为微服务应用构建 消息驱动 能力的框架。通过使用 Spring Cloud Strea m ,可以有效简化开发人员对消息中间件的使用复杂度,降低代码与消息中间件间的耦合度,屏蔽消息中间件 之 间的差异性,让开发人员可以有更多的精力关注于核心业务逻辑的处理。

主要有以下几个组件:

1)、目的地绑定器(Destination Binders):负责提供与外部消息系统集成的组件。

2)、固定器(Bindings):介于外部消息系统与应用程序间的桥梁 ,这个应用程序提供了生产者和消费者的消息 (由 Destination Binders 创建)。

3)、输入管道(Input Bindings):消费者通过Input Bindings 连接 Binder ,而 Binder 与 MQ 连接,即消费者通过 Input Bindings 从 MQ 读取数据。

4)、输出管道(Output Bindings):生产者通过Output Bindings 连接 Binder ,而 Binder 与 MQ 连接,即生产者通过 Output Bindings 向 MQ 写入数据。

5)、消息(Message):生产者和消费者使用的规范数据结构,用于与 Binders 通信(从而通过外部消息系统与其他应用程序通信)。

2、具体应用示例1(MQ使用kafka)

引入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.1、生产者

配置文件

server:
  port: 8090
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.30.88:9092,192.168.30.89:9092
      
      bindings:
        producer-out-0:
          destination: topic1
          content-type: application/json

代码实现

@Autowired
private StreamBridge streamBridge;

@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){
    Map<String , Object> map = new HashMap<>();
    map.put("tag", "tags");
    MessageHeaders headers = new MessageHeaders(map);
    // 封装消息
    Message<String> message = MessageBuilder.createMessage(msg, headers);
    //发送消息
    streamBridge.send("producer-out-0", message);
    return msg;
}
2.2、消费者

配置文件

server:
  port: 8091
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.30.88:9092,192.168.30.89:9092
      function:
        definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样

      bindings:
        consumer-in-0:
          destination: topic1
          content-type: application/json

代码实现

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){
    return msg -> {
        System.out.println("接收到消息:" + msg.getPayload());
    };
}
3、具体应用示例2(MQ使用Rocketmq)

引入依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

代码实现

@Autowired
private StreamBridge streamBridge;

@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){
    Map<String , Object> map = new HashMap<>();
    map.put(MessageConst.PROPERTY_TAGS, "tags");
    MessageHeaders headers = new MessageHeaders(map);
    // 封装消息
    Message<String> message = MessageBuilder.createMessage(msg, headers);
    //发送消息
    streamBridge.send("producer-out-0", message);
    return JSON.toJSONString(message);
}
3.2、消费者

配置文件:

server:
  port: 8091
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.30.88:9876
      function:
        definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样

      bindings:
        consumer-in-0:
          destination: topic1
          content-type: application/json

代码实现:

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){
    return msg -> {
        System.out.println("接收到消息:" + msg.getPayload());
    };
}

注:

1、在spring-cloud-stream 3.1.0之前的版本,还有采用定义Source、Sink等方式编写消息生产者和消费者,在3.1.0以后的版本中弃用@StreamListener的方式,而采用函数式编程的方式接入,使用StreamBrige来进行发送。

2、注意binding的名称命名规则

例如:上面的代码中定义的consumer。

# 输入:    <方法名> + -in- + <index>
# 输出:    <方法名> + -out- + <index>

总结:本文介绍Stream统一消息中间件的模型,给出基于kafka和Rocketmq两种消息中间件模型下的使用案例,以及给出废弃使用老版本的Source、Sink模式解释。帮助大家快速上手Stream的使用。

???????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:上了年纪的小男孩。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

文章来源:https://blog.csdn.net/zwl2220943286/article/details/135372588
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。