在Redis延迟队列中,我们使用有序集合(Sorted Set)来存储消息,其中,消息的到期时间作为有序集合的分数,消息内容作为有序集合的成员。例如,我们向Redis中添加一个到期时间为2023年12月25日的消息,可以使用以下命令:
ZADD delay_queue 1640390400 "message content"
在这个命令中,delay_queue是有序集合的名称,1640390400是消息的到期时间(即2023年12月25日的时间戳),"message content"是消息的内容。
当我们需要消费这个延迟消息时,需要定期轮询有序集合,找到所有到期的消息进行消费。可以使用以下命令:
ZRANGEBYSCORE delay_queue 0 CURRENT_TIMESTAMP
其中,CURRENT_TIMESTAMP为当前时间戳,该命令的作用是获取当前时间之前的所有消息。然后,我们可以遍历返回结果,依次进行消息的处理。
对于已经处理完成的消息,可以选择从有序集合中删除,也可以将其留在有序集合中,等待下一次轮询。如果希望删除消息,可以使用以下命令:
ZREM delay_queue "message content"
通过这种方式,Redis延迟队列可以实现消息的延迟投递和消费。因为消息按照到期时间排序,所以可以保证消息的有序性。此外,由于Redis本身是一种高性能内存数据库,所以延迟队列的处理效率也非常高。
Sorted Set(有序集合)在Redis中也被称为 ZSet(有序集合)。在Redis中,ZSet提供了有序的、唯一的成员(member)和对应的分数(score)之间的映射关系。这种数据结构非常适合用来实现延迟队列,因为可以根据分数进行范围查询,从而轻松地获取到期的消息。
Sorted Set(有序集合)适合用于实现延迟队列的主要原因有以下几点:
有序性:Sorted Set中的成员按照分数进行排序,这使得我们可以方便地根据到期时间进行范围查询。通过设置分数为消息的到期时间,可以轻松地获取到期的消息,而无需遍历整个队列。
唯一性:Sorted Set中的成员是唯一的,这意味着我们可以确保消息不会重复。这对于一些需要确保消息处理幂等性的场景非常重要。
高效性:Redis是一种高性能的内存数据库,Sorted Set的操作效率很高。在使用Sorted Set实现延迟队列时,添加、删除和范围查询等操作都可以在O(logN)的时间复杂度内完成,具有良好的性能表现。
支持多种操作:除了基本的添加、删除和范围查询,Sorted Set还支持其他一些有用的操作,如获取成员的分数、修改成员的分数、按照分数范围删除成员等。这些操作可以为延迟队列的管理提供更多的灵活性和功能。
综上所述,Sorted Set具备有序性、唯一性、高效性和丰富的操作特性,使其成为实现延迟队列的理想选择。在Redis中利用Sorted Set来实现延迟队列,可以简单高效地处理延迟任务。
Sorted Set 能够按照分数进行排序的原因在于它内部使用了一种数据结构——跳跃表(Skip List)。跳跃表是一种有序链表的数据结构,它通过在不同层级上建立索引的方式,提高了基本有序链表的查找效率。在 Redis 中,Sorted Set 的实现就是基于跳跃表。
通过跳跃表,Sorted Set 在插入新成员时可以保持成员的有序性。当需要按照分数进行排序时,Redis 内部会利用跳跃表的特性,快速地定位到对应分数的成员,从而实现按照分数进行排序的功能。
跳跃表的查询时间复杂度为 O(logN),这意味着无论集合中有多少成员,按照分数进行范围查询的效率都很高。这也是为什么 Sorted Set 能够高效地支持按照分数进行排序的重要原因之一。
总的来说,跳跃表作为 Sorted Set 内部的数据结构,通过其高效的有序性和索引特性,使得 Sorted Set 能够轻松地按照分数进行排序。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Set;
public class RedisDelayQueue {
private static final String DELAY_QUEUE_KEY = "delay_queue";
public static void main(String[] args) {
// 连接 Redis
Jedis jedis = new Jedis("localhost");
// 添加延迟消息
addMessageToDelayQueue(jedis, "message1", 5000); // 延迟 5 秒发送
addMessageToDelayQueue(jedis, "message2", 10000); // 延迟 10 秒发送
// 处理延迟消息
processDelayQueue(jedis);
// 断开连接
jedis.close();
}
// 将消息添加到延迟队列
private static void addMessageToDelayQueue(Jedis jedis, String message, long delayMillis) {
long currentTime = System.currentTimeMillis();
long delayTime = currentTime + delayMillis;
jedis.zadd(DELAY_QUEUE_KEY, delayTime, message);
System.out.println("Message added to delay queue: " + message);
}
// 处理延迟队列中的消息
private static void processDelayQueue(Jedis jedis) {
while (true) {
long currentTime = System.currentTimeMillis();
// 获取当前时间之前的第一个消息及其分数
Set<Tuple> messages = jedis.zrangeByScoreWithScores(DELAY_QUEUE_KEY, 0, currentTime, 0, 1);
if (messages.isEmpty()) {
// 延迟队列为空,暂停一段时间再继续轮询
try {
Thread.sleep(1000); // 暂停 1 秒钟
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
// 处理消息,这里简单打印消息内容
for (Tuple tuple : messages) {
String message = tuple.getElement();
double score = tuple.getScore();
System.out.println("Processing message: " + message);
System.out.println("Scheduled time: " + score);
// 从延迟队列中移除已处理的消息
jedis.zrem(DELAY_QUEUE_KEY, message);
}
}
}
}
Redisson 是一个基于 Redis 的 Java 客户端,提供了丰富的分布式数据结构和服务。其中就包括了延迟队列的实现。
下面是使用 Redisson 实现延迟队列的示例代码:
import org.redisson.Redisson;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.codec.StringCodec;
import java.util.concurrent.TimeUnit;
public class RedissonDelayQueueExample {
public static void main(String[] args) {
// 初始化 Redisson 客户端
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// 获取延迟队列
RBlockingDeque<String> delayQueue = redisson.getBlockingDeque("delayQueue", new StringCodec());
// 添加消息到延迟队列
delayQueue.offer("message1", 10, TimeUnit.SECONDS); // 延迟 10 秒钟
delayQueue.offer("message2", 30, TimeUnit.SECONDS); // 延迟 30 秒钟
// 处理延迟队列中的消息
while (true) {
try {
// 获取并移除队首元素,如果队列为空,则阻塞等待
String message = delayQueue.take();
System.out.println("Processing message: " + message);
// 执行相应的业务逻辑
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在上述代码中,我们首先创建了一个 Redisson 客户端,并获取了一个 RBlockingDeque 对象来操作延迟队列。接着,我们使用 offer 方法向延迟队列中添加了两条消息,并指定了它们的延迟时间。
最后,我们通过一个死循环来处理延迟队列中的消息。在循环中,我们调用 take 方法从延迟队列中获取并移除队首元素,如果队列为空,则该方法会阻塞等待直到有消息可用。在获取到消息后,我们可以执行相应的业务逻辑。
需要注意的是,在实际使用中,你可能需要考虑如何优雅地关闭 Redisson 客户端,并且在生产环境中需要处理异常、添加日志记录等完善的逻辑。同时,对于高并发场景,建议使用多线程的方式处理延迟队列中的消息,以提高性能和吞吐量。
import com.alibaba.fastjson.JSON;
import com.jingdianjichi.redis.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @Description: 群发任务延时service
* @DateTime: 2023/12/8 23:24
*/
@Service
@Slf4j
public class MassMailTaskService {
@Resource
private RedisUtil redisUtil;
private static final String MASS_MAIL_TASK_KEY = "massMailTask";
public void pushMassMailTaskQueue(MassMailTask massMailTask) {
Date startTime = massMailTask.getStartTime();
if (startTime == null) {
return;
}
if (startTime.compareTo(new Date()) <= 0) {
return;
}
log.info("定时群发任务加入延时队列,massMailTask:{}", JSON.toJSON(massMailTask));
redisUtil.zAdd(MASS_MAIL_TASK_KEY, massMailTask.getTaskId().toString(), startTime.getTime());
}
public Set<Long> poolMassMailTaskQueue() {
Set<String> taskIdSet = redisUtil.rangeByScore(MASS_MAIL_TASK_KEY, 0, System.currentTimeMillis());
log.info("获取延迟群发任务,taskIdSet:{}", JSON.toJSON(taskIdSet));
if (CollectionUtils.isEmpty(taskIdSet)) {
return Collections.emptySet();
}
redisUtil.removeZsetList(MASS_MAIL_TASK_KEY, taskIdSet);
return taskIdSet.stream().map(n -> Long.parseLong(n)).collect(Collectors.toSet());
}
}
@Component
@Slf4j
public class RedisUtil {
@Resource
private RedisTemplate redisTemplate;
public Boolean zAdd(String key, String value, Long score) {
return redisTemplate.opsForZSet().add(key, value, Double.valueOf(String.valueOf(score)));
}
public void removeZsetList(String key, Set<String> value) {
value.stream().forEach((val) -> redisTemplate.opsForZSet().remove(key, val));
}
public Set<String> rangeByScore(String key, long start, long end) {
return redisTemplate.opsForZSet().rangeByScore(key, Double.valueOf(String.valueOf(start)), Double.valueOf(String.valueOf(end)));
}
}