Redis List:打造高效消息队列的秘密武器【redis实战 一】

发布时间:2023年12月20日

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

前言

在软件开发的世界里,消息队列就像是一场奇妙的表演,每个演员都有自己的任务,而消息则是剧本中的情节。而要打造一场精彩的表演,我们需要一个强大的舞台工具。在这篇文章中,我们将带你走进 Redis 的神奇世界,揭示它是如何通过 List 这个神奇的工具,帮助我们搭建起高效的消息队列,让消息传递变得更加有趣。

Redis List简介

Redis中的List是一种有序、可重复的数据结构,它实际上是一个字符串链表。每个节点都包含一个字符串值,而这些节点按照插入顺序排列。

以下是Redis List的一些关键特性和操作:

  1. 有序集合: List中的元素是有序的,每个元素都有一个索引位置,允许按索引进行访问和操作。

  2. 可重复元素: List中的元素可以重复,同样的值可以存在于不同的位置。

  3. 左右两端操作: 可以从List的左端(头部)或右端(尾部)进行元素的添加和移除。

    • LPUSH key value [value …]: 在List的左端添加一个或多个值。
    • RPUSH key value [value …]: 在List的右端添加一个或多个值。
    • LPOP key: 移除并返回List左端的元素。
    • RPOP key: 移除并返回List右端的元素。
  4. 范围操作: 可以获取List中的一定范围的元素。

    • LRANGE key start stop: 获取List中指定范围的元素,范围是从start到stop,包含两端的元素。
  5. 长度操作: 可以获取List的长度。

    • LLEN key: 获取List的长度。
  6. 其他操作: 还有其他一些操作,如插入、删除、获取指定索引位置的元素等。

总体而言,Redis List适用于需要维护有序数据集合的场景,例如实现消息队列、任务队列等。在进行代码实现时,可以使用各种编程语言的Redis客户端库,根据需求调用相应的List操作,并确保代码中有适当的注释来解释每个操作的目的和影响。

List实现消息队列的优势

使用Redis List实现消息队列具有一些优势,尤其在一些特定的场景下。以下是一些优势以及与其他专业消息队列系统的比较:

优势:

  1. 轻量级:

    • Redis是一个内存数据库,List是其数据结构之一,相较于专业的消息队列系统,Redis更轻量级。这使得它更容易部署和维护,尤其适用于小型项目或资源受限的环境。
  2. 简单易用:

    • Redis提供了简单而强大的List操作,对于基本的消息队列需求,这种简单性是一个巨大的优势。使用LPUSH和RPUSH添加消息,LPOP和RPOP移除消息,非常直观。
  3. 无需额外组件:

    • Redis本身就是一个数据存储系统,无需额外组件,就可以实现消息队列功能。这减少了系统的复杂性,减轻了维护负担。
  4. 高性能:

    • 由于Redis是内存数据库,List的操作都是原子的,因此具有高性能。对于快速入队和出队的需求,Redis List能够提供低延迟的处理。
  5. 支持持久化:

    • Redis支持持久化,可以将消息队列的数据保存到磁盘,确保在重启后数据不丢失。这使得Redis List在一些需要持久化消息的场景下更具优势。

与专业消息队列的比较:

  1. 适用范围:

    • Redis List更适合于简单的、轻量级的消息队列需求。对于大规模、高吞吐量、分布式的系统,专业消息队列系统(如RabbitMQ、Kafka)更为适用。
  2. 功能丰富性:

    • 专业消息队列系统通常提供更多高级特性,如发布/订阅模型、消息确认机制、消息过期等。在需要这些高级特性的场景下,选择专业消息队列更为合适。
  3. 可扩展性:

    • 对于需要横向扩展的大型系统,专业消息队列系统通常提供更好的可扩展性和分布式支持,可以跨多个节点进行消息传递。
  4. 管理和监控:

    • 专业消息队列系统通常具有更完善的管理和监控工具,能够提供更详细的队列状态、消息追踪等信息。

在选择是否使用Redis List实现消息队列时,需要权衡项目的规模、复杂性以及对消息队列功能的具体需求。对于简单的应用场景,Redis List提供了一种轻量级、简单易用的解决方案。然而,在面对复杂、大规模的系统需求时,专业消息队列系统可能更为适用。

实战

maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

配置RedisConfiguration

package fun.bo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author xiaobo
 */
@Configuration
@Slf4j
public class RedisConfiguration {

    @Bean
    public <K,V> RedisTemplate<K,V> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        log.info("开始创建redis模板对象");

        RedisTemplate<K, V> redisTemplate = new RedisTemplate<K, V>();

        // 设置redis连接工厂对象
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        // 设置 redis key 的序列化器
        // redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setKeySerializer(new GenericJackson2JsonRedisSerializer());

        // 设置 redis 值的序列化器
        // redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<Object>(Object.class));
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        return redisTemplate;
    }

    @Bean
    public ExecutorService messageProcessorExecutor() {
        return Executors.newSingleThreadExecutor();
    }
}

解释一下上面的代码

嘿,这是一个有趣的 Redis 配置文件!这里就像是搭建一座小型 Redis 王国的蓝图。首先,我们有一个名为 RedisConfiguration 的大臣(类),他负责整个 Redis 的建设。

在这个王国里,最重要的是创建一个 Redis 模板对象。这就像是设计一张神奇的宝藏地图,告诉你在 Redis 中如何寻找和存放宝藏(数据)。当我们开始创建这个宝藏地图时,就打印了一条消息,告诉大家“开始创建redis模板对象”,这样大家就知道有人在设计新的宝藏地图了。

然后,我们看到了一个名为 redisTemplate 的特殊工具,它是一种用来挖掘和存储宝藏的神奇工具。这个工具有两个设置,一个是告诉大家如何处理宝藏地图上的坐标(Key),另一个是告诉大家如何保存宝藏的外表和内涵(Value)。我们使用了一种叫做 GenericJackson2JsonRedisSerializer 的神奇卷轴,它可以帮我们把宝藏地图上的信息转换成可读的 JSON 格式,这样大家就能轻松理解宝藏的内容了。

最后,还有一个名为 messageProcessorExecutor 的小使者,他负责处理消息的传递。他就像是王国里的信使,专门负责将重要的消息传递给其他领地。为了确保他能够高效地完成任务,我们给他提供了一个单线程的小驿站(ExecutorService),这样他就不会在处理消息的路上迷路了。

总的来说,这段代码就是在告诉大家如何建设一个充满幽默和智慧的 Redis 王国,确保我们的宝藏得以妥善保管和传递。希望你喜欢这个 Redis 王国的建设计划!

解释为何我们使用泛型,而不是使用确定的类型

首先,泛型就像是一把神奇的钥匙,可以打开各种类型的宝藏宝盒。如果我们硬性规定只能用 <String, Object> 这样的具体类型,那就好比在宝藏地图上写上:“只有字符串类型的宝藏坐标才能打开!” 这样一来,其他类型的宝藏就会感到被歧视了。

使用泛型,比如 <K, V>,就像是一种宽容的设计。这让我们的 Redis 王国更加灵活,不再关心宝藏的类型是什么,只要是合法的类型,都可以自如地在宝藏地图上标记和挖掘。

另外,泛型还能提供一种类型安全的感觉,就像在挖掘宝藏的时候,我们知道自己挖到的是什么类型的财富。这就避免了在宝藏使用过程中出现强制类型转换的尴尬场面,确保我们能够安心地享受宝藏带来的快乐。

总而言之,使用泛型就是为了让我们的 Redis 王国更加灵活、友好,让不同类型的宝藏都能够在这个王国里找到自己的位置。泛型就像是为宝藏地图设计的通用钥匙,打开了各种各样的珍宝之门!

生产者实现

/**
 * 推送消息到源List
 *
 * @param message 消息内容
 */
public void pushMessage(String message) {
    redisTemplate.opsForList().rightPush(sourceList, message);
}

消费者实现

public void popMessagePersistence() {
    String rightPopAndLeftPush = null;
    try {
        rightPopAndLeftPush = redisTemplate.opsForList().leftPop(sourceList, 3, TimeUnit.SECONDS);
        if (rightPopAndLeftPush != null) {
            log.info("处理消息: {}" , rightPopAndLeftPush);
        }
    }catch (Exception e) {
        // 处理超时异常,进行持久化操作,消息没处理可进行后续处理
        if (rightPopAndLeftPush != null) {
            redisTemplate.opsForList().leftPush(backupList, rightPopAndLeftPush);
        }
        log.error(e.getMessage());
    }
}

重要代码详解

leftPop(sourceList, 3, TimeUnit.SECONDS):这是一个有趣的操作,它执行了两个动作:

  • leftPop:从名为 sourceList 的列表的左侧弹出一个元素,类似于拿走列表左侧的第一个宝藏。
  • 3, TimeUnit.SECONDS:这部分是一个超时设置,表示如果在3秒内没有宝藏可用,就放弃等待。实际上,它在这个操作中的作用是设定一个最长等待时间。
  • 整体来说,这行代码的作用是从一个名为 sourceList 的列表的左侧弹出一个元素,并将其推送(push)到另一个地方,可能是另一个列表。这种操作常用于实现队列或者消息队列的场景,其中一个地方产生元素,而另一个地方消费元素。

所以,这行代码就像是在 Redis 王国中进行一场左手右手的宝藏传递游戏,左手拿着一个宝藏,右手把它传递给另一个地方。如果在3秒内没有成功传递,就算了,毕竟宝藏也不能等太久。希望这个解释能帮助你更好地理解这段代码!

BRPOP命令也称为阻塞式读取,客户端在没有督导队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用RPOP命令相比,这种方式能节省CPU开销

消费者调用

对于消费者的调用,可以使用while(true),或者ScheduledExecutorService

public void startProcessingMessages() {
    messageProcessorExecutor.execute(() -> {
        while (true) {
            popMessagePersistence();
        }
    });
}

public void startProcessingMessagesUseScheduled() {
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.scheduleAtFixedRate(this::popMessagePersistence, 0, 1, TimeUnit.SECONDS);
}

第一个方法:

这个方法启动了一个消息处理的执行器(messageProcessorExecutor),并在其中创建了一个无限循环。在每次循环中,它调用了 popMessagePersistence() 方法。这就像是一个永不停歇的消息处理机器,不断地从某个地方弹出消息并进行处理。由于使用了线程池(messageProcessorExecutor,可能是之前配置的单线程池),这样的设计可以在后台异步地处理消息,而不会阻塞主线程。

第二个方法:

这个方法使用了 ScheduledExecutorService,它是一个用于定时执行任务的执行器。在这个方法中,它创建了一个调度器(scheduler),并使用 scheduleAtFixedRate 方法来定期执行 popMessagePersistence() 方法。

具体来说,它的调度规则是:

  • this::popMessagePersistence:表示要执行的任务,即调用 popMessagePersistence() 方法。
  • 0:表示首次执行任务的延迟时间,这里是0,即立即执行。
  • 1:表示每次执行任务的时间间隔,这里是1秒。
  • TimeUnit.SECONDS:表示时间间隔的单位,这里是秒。

这个方法的设计更加精确,按照固定的时间间隔执行任务,比起无限循环的方式更加灵活。它适用于需要定期执行任务的场景,比如轮询某个资源或者定时处理一些事务。

效果图

在这里插入图片描述
在这里插入图片描述

结语

深深感谢你阅读完整篇文章,希望你从中获得了些许收获。如果觉得有价值,欢迎点赞、收藏,并关注我的更新,期待与你共同分享更多技术与思考。

在这里插入图片描述

文章来源:https://blog.csdn.net/Mrxiao_bo/article/details/135104665
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。