在Spring Cloud中使用RabbitMQ完成一个消息驱动的微服务

发布时间:2023年12月20日

Spring Cloud系列目前已经有了Spring Cloud五大核心组件:分别是,Eureka注册中心,Zuul网关,Hystrix熔断降级,openFeign声明式远程调用,ribbon负载均衡。这五个模块,对了,有没有发现,其实我这五个模块中ribbon好像还没有案例例举,目前只有一个Ribbon模块的搭建,后边我会完善的。

今天我们不主要围绕Spring Cloud的五大组件,本篇会以新的模块进行,完成一个以Rabbit MQ消息队列为核心的模块功能设计。在模块进行之前,我们先了解Spring Cloud 的Stream,这个很重要。

Spring Cloud Steam 是一个可以用来作为微服务应用构建消息驱动能力的框架,他可以基于Spring Boot来进行创建一个独立的,并且可以用来生产的Spring 应用程序。他通过使用Spring Integration(一体化)来链接消息嗲了中间件用以实现消息事件驱动的微服务应用。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动装配的实现,并且熟悉Spring的都知道他有个发布-订阅模式,消费组,以及消息分区核心部分。

Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点。

  • Binder:Binder是Spring Cloud Stream的核心组件之一,它提供了与消息中间件的连接和交互。通过Binder,开发者可以将应用程序与特定的消息中间件进行集成,而无需关注底层的细节。

  • 消息通道:Spring Cloud Stream使用消息通道作为应用程序中消息的传输媒介。消息通道可以是发布-订阅模式或点对点模式,开发者可以根据需求选择合适的通道类型。

  • 绑定注解:通过使用绑定注解,开发者可以将消息通道与应用程序中的方法进行绑定。例如,@Input注解用于将方法绑定到输入通道,@Output注解用于将方法绑定到输出通道。

  • 消息转换:Spring Cloud Stream支持自动的消息转换,使得开发者可以使用不同的数据格式和协议进行消息的传输。它提供了一些内置的消息转换器,同时也支持自定义的消息转换器。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

在应用程序中配置Rabbit MQ的链接信息,在application.yml或者application.properties中添加配置。

spring.cloud.stream.bindings.input.destination=myInputQueue
spring.cloud.stream.bindings.output.destination=myOutputQueue

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

这里的myInputQueue和myOutputQueue是你自定义的队列名称,你可以根据自己的需求进行命名。

接下来,创建一个消息处理器来处理输入和输出的消息。你可以使用@StreamListener注解来监听输入消息,并使用@EnableBinding注解来绑定输入和输出通道。

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
public class MessageProcessor {

    @StreamListener(Sink.INPUT)
    public void processMessage(String message) {
        // 处理输入消息
        System.out.println("Received message: " + message);
        
        // 处理完后可以发送输出消息
        // output.send(MessageBuilder.withPayload("Output message").build());
    }
}

@EnableBinding(Sink.class)将输入通道绑定到Sink,@StreamListener(Sink.INPUT)注解用于监听输入消息。你可以在processMessage方法中处理输入消息。

如果你想发送输出消息,你可以注入MessageChannel并使用它发送消息。例如,你可以在MessageProcessor类中注入MessageChannel

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(Source.class)
public class MessageProcessor {

    private final MessageChannel output;

    @Autowired
    public MessageProcessor(Source source) {
        this.output = source.output();
    }

    @StreamListener(Sink.INPUT)
    public void processMessage(String message) {
        // 处理输入消息
        System.out.println("Received message: " + message);
        
        // 处理完后发送输出消息
        output.send(MessageBuilder.withPayload("Output message").build());
    }
}

这样,当有消息发送到输入通道时,processMessage方法将被调用,并处理输入消息。在处理完输入消息后,它将发送一个输出消息到输出通道。

文章来源:https://blog.csdn.net/qq_45922256/article/details/135107334
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。