🥚今日鸡汤🥚
????????见过一些人,他们朝九晚五😭,有时也要加班,却能把生活过得很😎有趣。他们有自己的爱好,不怕独处。他们有自己的坚持,哪怕没人在乎。🤦?♂?
? ? ? ? ? ? ? ? ? ? ? ? ????????????????????????????????? ? ? ?开心一点😁
? ? ? ? ? ? ? ? ? ? ? ? ? ????????????????????????????????? ? ?认真一点🤔
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?努力一点🫡
目录
😶?🌫?1.为什么引入Stream
Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。
Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
- 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
- 应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。
- 通过我们配置来binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
- 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
- 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
- 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot </groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--基础依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--eureka客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--消息驱动-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
server:
port: 8801
spring:
application:
name: cloud-stream-provider
rabbitmq:
host: 192.168.20.129
port: 5672
username: root
password: 123456
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
bindings:
output:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
@SpringBootApplication
public class StreamMqMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMqMain8801.class);
}
}
- @EnableBinding:Spring Cloud Stream中用来启用消息传递功能的注释。
- 它用于将应用程序绑定到消息传递系统(例如,Apache Kafka, RabbitMQ),并声明用于发送和接收消息的输入和输出通道。
- 通过使用@EnableBinding,您可以定义应用程序所需的通道和消息处理程序
@EnableBinding(Source.class)//定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {
@Autowired
private MessageChannel output; //消息发送管道
@Override
public String send() {
String serial= UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("======serial:"+serial);
return null;
}
}
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot </groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--基础依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--eureka客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--消息驱动-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: 192.168.20.129
port: 5672
username: root
password: 123456
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
@SpringBootApplication
public class StreamMqMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMqMain8802.class);
}
}
- 1.@StreamListener注解是Spring Cloud Stream框架提供的一个注解,用于定义一个消息监听器。
- 2.通过使用@StreamListener注解,可以将一个方法标记为消息的消费者,并指定该方法要监听的消息通道。
- 3.当有消息到达指定的通道时,该方法会被自动触发执行,从而处理这个消息。
- 4.@StreamListener注解通常与@EnableBinding注解一起使用,用于指定所要绑定的消息通道。
- 5.@EnableBinding注解用于绑定消息通道与应用程序中的输入输出接口,@StreamListener注解则用于标记一个方法作为消息的消费者。
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1---接受消息:"+message.getPayload()+",port:"+serverPort);
}
}
问题描述:
- 1.根据8802,重新创建cloud-stream-rabbitmq-consumer8803,作为消息接收模块
- 2.8801生产者发送消息
- 3.8802,8803都可以接收到
?如果一个订单同时被两个服务获取到,就会造成数据错误
注意:在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。
在消费者端添加group配置:分为xzA,xzB
8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。
8802,8803的group配置相同名称,重新启动 ,使用8801发送两条消息,8802接受一条,8803接收一条