整体思路
? ? ? ? 1. pom增加redis依赖;
? ? ? ? 2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;
? ? ? ? 3. 将消息订阅bean及监听器注册到配置中;
1. pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.6</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2. 消息监听器实现代码
package cn.thuniwhir.fileserver.redis;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: TODO
**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {
private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);
// 创建一个线程池
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
@Override
public void onMessage(MapRecord message) {
// 异步处理消息
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));
});
}
}
3. redis订阅bean及监听器注册
package cn.thuniwhir.fileserver.redis;
import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import java.util.stream.Collectors;
/**
* @Description: TODO
**/
@Configuration
public class RedisMQConfig {
@Autowired
private RedisMQListener redisMQListener;
@Autowired
private RedisUtils redisUtils;
private static RedisTemplate<Object, Object> redisTemplate;
private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);
public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {
if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {
StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);
if (xInfoGroups.isEmpty()) {
redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
} else {
if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {
redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
}
}
} else {
redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
}
StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(1)).build();
StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);
streamMessageListenerContainer.start();
return subscription;
}
}
4. 测试生产消息 消息监听成功
4.1 生产消息
@RequestMapping("/produceMessage")
public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {
String key = jsonObject.getString("key");
String value = jsonObject.getString("value");
MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));
redisTemplate.opsForStream().add(mapRecord);
System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());
return formatResult(null);
}
4.2 消息监听器监听消息到达 代码见第二节
4.3 测试结果