消息队列-RockMQ-Demo案例&&拓展输入输出渠道

发布时间:2024年01月09日

基于Spirng Cloud Alibaba基础搭建

下面为一个Demo 生产者和消费者是一起的。
父工程pom

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <com.alibaba.cloud.version>2.2.8.RELEASE</com.alibaba.cloud.version>
    <com.cloud.version>Hoxton.SR12</com.cloud.version>
    <com.dubbo.version>2.2.7.RELEASE</com.dubbo.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${com.alibaba.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${com.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-dubbo</artifactId>
            <version>${com.dubbo.version}</version>
        </dependency>
    </dependencies>
</dependencyManagement>

工程POM

<dependencies>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

配置文件

server:
  port: 9500
spring:
  application:
    name: rocket-demo
  cloud:
    stream:
      bindings:
        input:
          content-type: application/json
          destination: test-topic
          group: test-group
        output:
          content-type: application/json
          destination: test-topic
          group: test-group
      rocketmq:
        binder:
          name-server: ip:9876
          group: rocket-demo
@SpringBootApplication
// 绑定输入输出
@EnableBinding({Source.class, Sink.class})
public class RocketDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketDemoApplication.class, args);
    }
    // 监听这个输入通道,收到消息直接在这里打印,消费者
    @StreamListener("input")
    public void receiveInput(String receiveMsg) {
        System.out.println("input receive: " + receiveMsg);
    }
}

生产者

public static void main(String[] args) throws Exception {
      DefaultMQProducer producer = new DefaultMQProducer("producer_group");
      producer.setNamesrvAddr("ip:9876");
      producer.start();
      for (int i = 0; i < 3; i++) {
          Message msg = new Message("test-topic", "tagStr", "message from rocketmq producer".getBytes());
          producer.send(msg);
      }

  }
拓展输入输出渠道

新建自定义Sink和Source 继承Sink 和 Source,原来的渠道也会保留。

public interface CustomSink extends Sink {
    /**
     * Input channel name.
     */
    String INPUT2 = "input2";
    /**
     * @return input channel.
     */
    @Input(CustomSink.INPUT2)
    SubscribableChannel input2();
}

public interface CustomSource extends Source {
    String OUTPUT2 = "output2";

    /**
     * @return output channel
     */
    @Output(CustomSource.OUTPUT2)
    MessageChannel output2();
}

启动类

@SpringBootApplication
@EnableBinding({CustomSource.class, CustomSink.class})
public class RocketDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketDemoApplication.class, args);
    }
    @StreamListener("input")
    public void receiveInput(String receiveMsg) {
        System.out.println("input receive: " + receiveMsg);
    }
    @StreamListener("input2")
    public void receiveInputSecond(String receiveMsg) {
        System.out.println("input2 receive: " + receiveMsg);
    }
}

配置

server:
  port: 9500
spring:
  application:
    name: rocket-demo
  cloud:
    stream:
      bindings:
        input:
          content-type: application/json
          destination: test-topic
          group: test-group
        output:
          content-type: application/json
          destination: test-topic
          group: test-group
        input2:
          content-type: application/json
          destination: test-topic-second
          group: test-group-second
        output2:
          content-type: application/json
          destination: test-topic-second
          group: test-group-second
      rocketmq:
        binder:
          name-server: ip:9876
          group: rocket-demo

生产者

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group_second");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("test-topic-second", "tagStr2", "message from rocketmq producer2".getBytes());
            producer.send(msg);
        }
    }
}

在这里插入图片描述

文章来源:https://blog.csdn.net/qq_43259860/article/details/135440714
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。