下面为一个Demo 生产者和消费者是一起的。
父工程pom
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<com.alibaba.cloud.version>2.2.8.RELEASE</com.alibaba.cloud.version>
<com.cloud.version>Hoxton.SR12</com.cloud.version>
<com.dubbo.version>2.2.7.RELEASE</com.dubbo.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${com.alibaba.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${com.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
<version>${com.dubbo.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
工程POM
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
配置文件
server:
port: 9500
spring:
application:
name: rocket-demo
cloud:
stream:
bindings:
input:
content-type: application/json
destination: test-topic
group: test-group
output:
content-type: application/json
destination: test-topic
group: test-group
rocketmq:
binder:
name-server: ip:9876
group: rocket-demo
@SpringBootApplication
// 绑定输入输出
@EnableBinding({Source.class, Sink.class})
public class RocketDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketDemoApplication.class, args);
}
// 监听这个输入通道,收到消息直接在这里打印,消费者
@StreamListener("input")
public void receiveInput(String receiveMsg) {
System.out.println("input receive: " + receiveMsg);
}
}
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("ip:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("test-topic", "tagStr", "message from rocketmq producer".getBytes());
producer.send(msg);
}
}
新建自定义Sink和Source 继承Sink 和 Source,原来的渠道也会保留。
public interface CustomSink extends Sink {
/**
* Input channel name.
*/
String INPUT2 = "input2";
/**
* @return input channel.
*/
@Input(CustomSink.INPUT2)
SubscribableChannel input2();
}
public interface CustomSource extends Source {
String OUTPUT2 = "output2";
/**
* @return output channel
*/
@Output(CustomSource.OUTPUT2)
MessageChannel output2();
}
启动类
@SpringBootApplication
@EnableBinding({CustomSource.class, CustomSink.class})
public class RocketDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketDemoApplication.class, args);
}
@StreamListener("input")
public void receiveInput(String receiveMsg) {
System.out.println("input receive: " + receiveMsg);
}
@StreamListener("input2")
public void receiveInputSecond(String receiveMsg) {
System.out.println("input2 receive: " + receiveMsg);
}
}
配置
server:
port: 9500
spring:
application:
name: rocket-demo
cloud:
stream:
bindings:
input:
content-type: application/json
destination: test-topic
group: test-group
output:
content-type: application/json
destination: test-topic
group: test-group
input2:
content-type: application/json
destination: test-topic-second
group: test-group-second
output2:
content-type: application/json
destination: test-topic-second
group: test-group-second
rocketmq:
binder:
name-server: ip:9876
group: rocket-demo
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group_second");
producer.setNamesrvAddr("ip:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("test-topic-second", "tagStr2", "message from rocketmq producer2".getBytes());
producer.send(msg);
}
}
}