Redis面试篇

发布时间:2024年01月11日

redis面试题主要内容

面试官在面试时主要会问以下这些方面的问题

下面是一些问题示例:

redis-使用场景

缓存

缓存穿透

介绍

缓存穿透:查询一个不存在的数据,mysql查询不到数据也不会直接写入缓存,就会导致每次请求都会去查数据库,数据库的压力增大。

解决方案

解决方案一:缓存空数据,查询返回的数据为空,仍把这个空结果进行缓存。

优点:简单

缺点:消耗内存(每次查询不存在的数据时,都需要在缓存中存储一个空数据,这会占用一定的内存空间。如果缓存中出现大量的空数据,可能会导致内存消耗过高,影响系统的性能和可扩展性。)

可能会发生不一致的问题(在某些情况下,可能会发生缓存中的空数据与实际后端数据不一致的情况。例如,在某个时间点上,缓存中存储了一个空数据,但实际上后端数据已经发生了变化。这样,在后续查询中,缓存会返回错误的空数据,导致数据不一致的问题。)

解决方案二:布隆过滤器

优点:内存占用较少,没有多余key

缺点:实现复杂,存在误判(当哈希冲突较多或者位数组较小时,可能会导致误判率增加。)

流程图如下:

布隆过滤器

布隆过滤器(Bloom Filter)是一种用于判断一个元素是否属于一个集合的数据结构。它可以快速地判断一个元素是否可能在这个集合中,也可以用于过滤掉不符合条件的元素,从而减少访问磁盘或网络等IO操作的次数。

原理:

布隆过滤器通常是由一个位数组和多个哈希函数组成的。假设要判断一个元素是否在集合中,就将这个元素经过多个哈希函数的计算得到多个哈希值,然后将对应的位数组中的位置设置为1。当需要判断另一个元素是否在集合中时,同样将该元素经过多个哈希函数的计算得到多个哈希值,然后查看对应的位数组中的位置是否都为1。如果有任何一位为0,则这个元素不在集合中;否则这个元素可能在集合中。

由于哈希函数产生的哈希值范围可能比位数组的长度还大,所以哈希函数计算出的哈希值需要进行映射,将其映射到位数组的下标范围内。映射方法可以采用取模运算、乘法哈希等方式。

误判率:数组越小误判率就越大,数组越大误判率就越小,但是同时带来了更多的内存消耗。、

布隆过滤器实现方案

Redisson?

添加maven依赖

<!--redisson-->
<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson-spring-boot-starter</artifactId>
	<version>3.17.0</version>
</dependency>

配置yml

spring:
  datasource:
    username: xx
    password: xxxxxx
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=CTT
  cache:
    type: redis
  redis:
    database: 0
    port: 6379               # Redis服务器连接端口
    host: localhost          # Redis服务器地址
    password: xxxxxx         # Redis服务器连接密码(默认为空)
    timeout: 5000            # 超时时间

配置RedissonConfig

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;
import java.util.Random;

@EnableCaching
@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;


    @Bean(destroyMethod = "shutdown")  // bean销毁时关闭Redisson实例,但不关闭Redis服务
    public RedissonClient redisson() {
        //创建配置
        Config config = new Config();
        /**
         *  连接哨兵:config.useSentinelServers().setMasterName("myMaster").addSentinelAddress()
         *  连接集群: config.useClusterServers().addNodeAddress()
         */
        config.useSingleServer()
                .setAddress("redis://" + host + ":" + port)
                .setPassword(password)
                .setTimeout(5000);
        //根据config创建出RedissonClient实例
        return Redisson.create(config);
    }

    @Bean
    public CacheManager RedisCacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        // 解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        /**
         * 新版本中om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL)已经被废弃
         * 建议替换为om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL)
         */
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化解决乱码的问题
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                // 设置缓存过期时间  为解决缓存雪崩,所以将过期时间加随机值
                .entryTtl(Duration.ofSeconds(60 * 60 + new Random().nextInt(60 * 10)))
                // 设置key的序列化方式
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                // 设置value的序列化方式
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer));
        // .disableCachingNullValues(); //为防止缓存击穿,所以允许缓存null值
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                // 启用RedisCache以将缓存 put/evict 操作与正在进行的 Spring 管理的事务同步
                .transactionAware()
                .build();
        return cacheManager;
    }
}

工具类BloomFilterUtil

import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class BloomFilterUtil {

    @Resource
    private RedissonClient redissonClient;

    /**
     * 创建布隆过滤器
     *
     * @param filterName         过滤器名称
     * @param expectedInsertions 预测插入数量
     * @param falsePositiveRate  误判率
     */
    public <T> RBloomFilter<T> create(String filterName, long expectedInsertions, double falsePositiveRate) {
        RBloomFilter<T> bloomFilter = redissonClient.getBloomFilter(filterName);
        bloomFilter.tryInit(expectedInsertions, falsePositiveRate);
        return bloomFilter;
    }
}

编写service实现层

Guava

引入依赖

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>28.1-jre</version>
</dependency>

测试

import com.google.common.base.Charsets;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;

import java.text.NumberFormat;
import java.util.*;

/**
 * @author CSDN编程小猹
 * @data 2024/01/03
 * @description 测试布隆过滤器的正确判断和误判
 * 往布隆过滤器里面存放100万个元素,测试100个存在的元素和9900个不存在的元素
 */
public class BloomFilterDemo {
    //元素个数 100万
    private static final int insertions = 1000000;

    public static void main(String[] args) {
        //创建一个布隆过滤器,第二个值是元素的个数
        // 初始化一个存储string数据的布隆过滤器,初始化大小为100W
        // 默认误判率是0.03
        BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),
                insertions,0.03D);

        // 用于存放所有实际存在的key,判断key是否存在,这个可快速判断key是否存在
        Set<String> set = new HashSet<>(insertions);

        // 用于存放所有实际存在的key,可以取出使用,这个可供使用下标取出
        List<String> list = new ArrayList<>(insertions);

        //插入数据
        for (int i = 0;i<insertions;i++){
            String uuid = UUID.randomUUID().toString();
            bloomFilter.put(uuid);
            set.add(uuid);
            list.add(uuid);
        }

        int right = 0; // 正确判断的次数
        int wrong = 0; // 错误判断的次数

        for (int i = 0; i < 10000; i++) {
            // 可以被100整除的时候,取一个存在的数。否则随机生成一个UUID
            // 0-10000之间,可以被100整除的数有100个(100的倍数)
            //这里就是实现100个存在key,9900个不存在key。
            String data = i % 100 == 0 ? list.get(i / 100) : UUID.randomUUID().toString();

            //bloomFilter.mightContain(data)   布隆过滤器提供的方法用于判断数据是否命中
            if (bloomFilter.mightContain(data)) {
                if (set.contains(data)) {
                    // 判断存在实际存在的时候,命中
                    right++;
                    continue;
                }
                // 判断存在却不存在的时候,错误
                wrong++;
            }
        }



        //计算命中率和误判率
        NumberFormat percentFormat = NumberFormat.getPercentInstance();
        percentFormat.setMaximumFractionDigits(2); //最大小数位数
        float percent = (float) wrong / 9900;
        float bingo = (float) (9900 - wrong) / 9900;

        System.out.println("在100W个元素中,判断100个实际存在的元素,布隆过滤器认为存在的:"+right);
        System.out.println("在100W个元素中,判断9900个实际不存在的元素,误认为存在的:"+wrong+"" +
                ",命中率:" + percentFormat.format(bingo) + ",误判率:" + percentFormat.format(percent) );


        long numOfBits = optimalNumOfBits(insertions,0.03D);
        System.out.println("100w个元素,误判率为3%的情况下,位图容量为:"+(numOfBits/8.0/1024/1024)+"MB");
        System.out.println("100w个元素,误判率为3%的情况下,哈希函数个数为:"+(optimalNumOfHashFunctions(insertions,numOfBits))+"个");
    }


    //下面两个方法是BloomFilter的方法,只是在里面是包权限,这里就直接复制出来用了。
    /**
     * 计算出哈希函数个数
     * @param expectedInsertions  期望元素个数
     * @param numOfBits  位图容量
     * @return
     */
    static int optimalNumOfHashFunctions(long expectedInsertions, long numOfBits) {
        return Math.max(1, (int)Math.round((double)numOfBits / (double)expectedInsertions * Math.log(2.0D)));
    }

    /**
     * 计算出位图容量
     * @param expectedInsertions 期望元素个数
     * @param fpp  误判率
     * @return
     */
    static long optimalNumOfBits(long expectedInsertions, double fpp) {
        if (fpp == 0.0D) {
            fpp = 4.9E-324D;
        }

        return (long)((double)(-expectedInsertions) * Math.log(fpp) / (Math.log(2.0D) * Math.log(2.0D)));
    }
}

连续三次的执行结果,误判率都在3%作用,因为默认的误判率为3%。并且使用0.87MB的位图容量加5个哈希函数就可以达到100w数据的快速判断是否存在。只是存在3%的误判率。

可以指定误判率:

?
//最后一个参数就是误判率,这里设置的是0.1  10%。
BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),insertions,0.01D);

?

缓存击穿

介绍

缓存击穿:给某一个key设置了过期时间,当key过期的时候,恰好这时间点对这个key有大量的并发请求过来,这些并发的请求可能会瞬间把DB压垮。

解决方案

解决方案一

互斥锁

思路:

使用互斥锁来解决缓存击穿问题的思路是通过对关键代码块进行加锁,保证在同一时间只有一个线程能够执行这段代码。这样可以有效地避免多个线程同时访问数据库,减轻数据库的压力,提高系统的性能和可用性。

在解决缓存击穿问题时,通常会使用互斥锁锁住以下几个关键步骤:

检查缓存:首先检查缓存中是否存在所需数据。
缓存失效处理:如果缓存中不存在所需数据,即缓存失效,需要进行进一步处理。
加锁:在进行缓存失效处理之前,获取互斥锁,确保只有一个线程能够执行后续的数据库查询和缓存更新操作。
数据查询和缓存更新:在成功获得互斥锁之后,执行数据库查询操作,获取所需数据,并将数据更新到缓存中。
释放锁:缓存更新完成后,释放互斥锁,允许其他等待的线程获得锁并从缓存中获取数据。
通过加锁的方式,保证了同一时间只有一个线程能够执行关键代码块,避免了缓存击穿问题。其他线程在等待期间可以从缓存中获取旧数据,而不会直接访问数据库。这样可以减少数据库的并发访问压力,提升了系统的并发能力和性能。

需要注意的是,互斥锁的使用应该谨慎,避免持有锁的时间过长,否则可能会导致其他线程的延迟和性能下降。在设计时,要权衡锁的粒度和性能需求,确保互斥锁的使用场景合理,并根据具体情况选择合适的锁机制(如读写锁、分布式锁等)进行优化。
?

流程图如下

代码实现

//缓存击穿实现
    private String redisTest1(String userName) throws Exception {
        //首先请求进来,直接从redis查询查看是否存在
        String redisResult = stringRedisTemplate.opsForValue().get("stefentest");
        //判断redis中是否存在
        if(redisResult != null){
            //如果存在,就直接返回
            return "redis查询成功,result结果为"+redisResult;
        }
 // 定义互斥锁的键名
    String lockKey = "mutex_lock:" + key;
    
    // 尝试获取互斥锁
    Boolean acquired = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "locked");
    if (acquired != null && acquired) {
        try {
            // 获取到锁,进行缓存查询
            Object cachedData = stringRedisTemplate.opsForValue().get(key);
            if (cachedData == null) {
                // 从后端获取数据并存入缓存
                Object data = fetchDataFromBackend();
                stringRedisTemplate.opsForValue().set(key, data.toString());
            }
        } finally {
            // 释放锁
            stringRedisTemplate.delete(lockKey);
        }
    } else {
        // 未获取到锁,等待一段时间后重试
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        accessCacheWithMutexLock(key);
    }

        return "";
    }

解决方案二

逻辑过期

逻辑过期是一种指定缓存数据失效时间的方式,与物理过期不同。逻辑过期并不直接将缓存中的数据删除,而是在缓存中保留该数据,但标记其为过期,表示该数据已经不再可用。

在逻辑过期的情况下,当有请求查询该数据时,缓存会先检查该数据是否过期,如果过期,则缓存会认为该数据不存在,并重新从数据源获取最新的数据。如果数据没有过期,则直接返回缓存中的数据。需要注意的是,逻辑过期时间是相对较短的,通常设置在几分钟或者几十分钟之内。

思路

基于逻辑过期的方式解决缓存穿透问题的思路是通过在缓存中设置较短的逻辑过期时间来处理查询不存在的数据。这种方式的核心理论是将缓存和数据源之间的查询请求进行分流,减轻数据源的负担,并提高系统的响应速度。

具体来说,当一个请求到达时,先检查缓存中是否存在所需数据。如果缓存中不存在该数据,则说明可能发生了缓存穿透。为了避免直接向数据源发起查询请求,并且继续保持对数据的查询,我们通过设置逻辑过期时间来抑制该请求。也就是说,将该请求的结果设置为空,并设置一个较短的逻辑过期时间。

这样一来,在逻辑过期时间内,其他同样请求该数据的请求会继续从缓存中获取旧的空结果。这样可以避免大量请求直接访问数据源,减轻了数据源的压力。同时,在逻辑过期时间到期后,新的请求会再次触发查询数据源的操作,以更新缓存中的数据。这样可以保证缓存中的数据与数据源的一致性。

从理论上讲,基于逻辑过期的方式能够有效地处理缓存穿透问题。通过将不存在的数据也缓存起来,并设置较短的逻辑过期时间,可以在一段时间内屏蔽掉大量的查询请求,减轻了数据源的负担。而在逻辑过期时间到期后,通过更新缓存的方式保证了数据的一致性,使得后续的请求可以从缓存中获取到最新的数据。

需要注意的是,选择适当的逻辑过期时间非常重要。过长的逻辑过期时间可能导致缓存数据与实际数据不一致,而过短的逻辑过期时间则可能增加了缓存的更新频率,影响系统的性能。在实际应用中,需要根据具体业务场景和数据特点进行调整,找到一个合适的平衡点。

流程图如下

代码实现

我们先设定一个场景:假设这是一个电商平台,我们通过id去查询店铺信息。

代码实现流程图如下:

1)构建存储类

我们想要实现逻辑过期,首先得清楚redis中到底要存储什么样的数据?我们是不是要在每个类中都添加一个逻辑过期的字段?这是不对的,如果我们再每个类中都添加了一个逻辑过期时间字段,这样对原代码就有了 侵入性 ,我们应该使整个系统具有可拓展性,所以我们应该新建一个类来填充要存入redis的数据,代码如下:

@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}

2)创建线程池

由于我们需要开启独立线程去重建缓存,所以我们可以选择创建一个线程池。

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

3)编写缓存重建的代码

缓存重建就是直接查询数据库,将查询到的数据缓存到redis中。

public void saveShop2Redis(Long id, Long expireSeconds) throws InterruptedException {
    //1.查询店铺数据
    Shop shop = getById(id);
    //2.封装逻辑过期时间
    RedisData redisData = new RedisData();
    redisData.setData(shop);
    //设置逻辑过期时间
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
    stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}

4)编写业务方法并调用缓存击穿方法

@Override
public Result queryById(Long id) {

    //逻辑过期解决 缓存击穿
    Shop shop = queryWithLogicalExpire(id);
    if (shop == null) {
        return Result.fail("店铺不存在!");
    }
    return Result.ok(shop);
}

public Shop queryWithLogicalExpire(Long id) {
    //1.从redis查询商铺缓存
    String key = CACHE_SHOP_KEY + id;
    String shopJson = stringRedisTemplate.opsForValue().get(key);
    //2.判断是否存在
    if (StrUtil.isBlank(shopJson)) {
        //未命中,直接返回空
        return null;
    }
    //3.命中,判断是否过期
    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
    Shop cacheShop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
    if (redisData.getExpireTime().isAfter(LocalDateTime.now())) {
        //3.1未过期,直接返回店铺信息
        return cacheShop;
    }
    //3.2.已过期,缓存重建
    //3.3.获取锁
    String lockKey = LOCK_SHOP_KEY + id;
    boolean flag = tryLock(lockKey);
    if (flag) {
        //3.4.获取成功
        //4再次检查redis缓存是否过期,做double check
        shopJson = stringRedisTemplate.opsForValue().get(key);
        //4.1.判断是否存在
        if (StrUtil.isBlank(shopJson)) {
            //未命中,直接返回空
            return null;
        }
        //4.2.命中,判断是否过期
        redisData = JSONUtil.toBean(shopJson, RedisData.class);
        cacheShop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
        if (redisData.getExpireTime().isAfter(LocalDateTime.now())) {
            //4.3.未过期,直接返回店铺信息
            return cacheShop;
        }
        CACHE_REBUILD_EXECUTOR.submit(() -> {
            //5.重建缓存
            try {
                this.saveShop2Redis(id, 20L);
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                //释放锁
                unLock(lockKey);
            }
        });
    }
    //7.获取失败,返回旧数据
    return cacheShop;
}

缓存雪崩

介绍

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案

解决方案一

给不同的Key的TTL添加随机值

在设置缓存的过期时间(TTL)时,可以为不同的缓存key添加一个随机值。这样做的目的是为了避免大量缓存同时失效,从而减少数据库的压力。通过给不同的缓存key设置不同的过期时间,可以使得缓存的失效时间分散在不同的时间段,降低了缓存同时失效的概率。

下面是一个示例代码,演示了如何给缓存key添加随机值:

// 设置缓存key的过期时间,并添加随机值
public void setCacheWithRandomTTL(String key, Object value) {
    int ttl = 3600; // 缓存过期时间为1小时
    Random random = new Random();
    int randomValue = random.nextInt(600); // 随机生成0-600的值
    int finalTTL = ttl + randomValue; // 最终的过期时间为ttl + 随机值
    redis.set(key, value, finalTTL);
}

解决方案二

利用Redis集群提高服务的可用性

当使用单个Redis服务器时,如果该服务器宕机,将导致缓存不可用。为了提高服务的可用性,可以使用Redis集群。Redis集群将数据分布在多个节点上,当其中一个节点宕机时,其他节点仍然可以提供缓存服务。这样可以降低单点故障的风险,提高系统的稳定性。

// 创建Redis集群连接
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("127.0.0.1", 6379));
nodes.add(new HostAndPort("127.0.0.1", 6380));
JedisCluster jedisCluster = new JedisCluster(nodes);
 // 使用Redis集群进行缓存操作
jedisCluster.set("key", "value");
String result = jedisCluster.get("key");

解决方案三

给缓存业务添加降级限流策略

当缓存失效或Redis服务宕机时,为了避免大量请求直接访问数据库,可以给缓存业务添加降级限流策略。降级限流策略可以根据系统的负载情况,动态地限制请求的数量,从而保护数据库免受过多请求的冲击。
下面是一个示例代码,演示了如何给缓存业务添加降级限流策略:

RateLimiter rateLimiter = RateLimiter.create(100); // 每秒最多处理100个请求
if (rateLimiter.tryAcquire()) {
    // 缓存业务处理逻辑
    // ...
} else {
    // 降级处理逻辑
    // ...
}

解决方案四

给业务添加多级缓存

除了使用Redis作为缓存,我们还可以在业务中添加多级缓存。多级缓存可以将数据缓存在不同的缓存层中,从而提高缓存的命中率和效率。比如,可以将热门数据缓存在内存中的Redis中,将冷门数据缓存在分布式缓存中,将持久化数据缓存在数据库中,还可以使用本地缓存,例如使用内存缓存(如 Memcached 或 Caffeine Cache)。本地缓存可以快速响应请求,避免直接访问 Redis 或数据库。可以利用缓存失效时间的随机性,分散缓存的过期时间,从而避免大量的缓存同时失效。。
下面是一个示例代码,演示了如何通过redis和内存缓存给业务添加多级缓存:

public class CacheManager {
    private final LocalCache localCache;
    private final RedisCache redisCache;

    public CacheManager() {
        // 初始化本地缓存和 Redis 缓存
        this.localCache = new LocalCache();
        this.redisCache = new RedisCache();
    }

    public Object getData(String key) {
        // 先从本地缓存获取数据
        Object data = localCache.get(key);

        if (data == null) {
            // 如果本地缓存中没有数据,则尝试从 Redis 缓存获取数据
            data = redisCache.get(key);

            if (data != null) {
                // 将数据加入本地缓存,减轻对 Redis 的压力
                localCache.put(key, data);
            } else {
                // 如果 Redis 缓存中也没有数据,则从数据库获取数据并放入 Redis 和本地缓存
                data = fetchDataFromDatabase(key);
                redisCache.put(key, data);
                localCache.put(key, data);
            }
        }

        return data;
    }

    private Object fetchDataFromDatabase(String key) {
        // 从数据库获取数据的逻辑实现,此处省略
        return null;
    }
}

public class LocalCache {
    private Map<String, Object> cacheMap;

    public LocalCache() {
        this.cacheMap = new HashMap<>();
    }

    public void put(String key, Object value) {
        cacheMap.put(key, value);
    }

    public Object get(String key) {
        return cacheMap.get(key);
    }
}

public class RedisCache {
    public Object get(String key) {
        // 从 Redis 获取数据的逻辑实现,此处省略
        return null;
    }

    public void put(String key, Object value) {
        // 将数据放入 Redis 的逻辑实现,此处省略
    }
}

缓存与数据库的一致性问题

介绍

客户端对数据库中的数据主要有两类操作,读(select)与写(DML)。缓存由于其高并发和高性能的特性,已经在项目中被广泛使用。在读取缓存方面,大家没啥疑问,都是按照下图的流程来进行业务操作。

对于写操作(DML),缓存与数据库中的内容都需要被修改,但两者的执行必定存在一个先后顺序,这可能会导致缓冲与数据库中的数据不再一致,此时主要需要考虑两个问题:

  1. 更新缓存的策略问题:当缓存中的内容变化时,是选择修改缓存,还是直接淘汰缓存
  2. 执行顺序的问题:先更新缓存还是先更新数据库。针对这两点问题,一共可以分为四种方案:
  • 先更新缓存,再更新数据库;
  • 先更新数据库,再更新缓存;
  • 先淘汰缓存,再更新数据库;
  • 先更新数据库,再淘汰缓存。

解决方案

要求一致性强的解决方案

下面的方案适用于项目要求缓存和数据库的数据一致性强的情况。

双写一致

下面先来介绍一下双写一致

双写一致性:当修改了数据库的数据也要同时更新缓存的数据,缓存和数据库的数据要保持一致

如何保证双写一致性呢?

读操作:缓存命中,直接返回;缓存未命中查询数据库,写入缓存,设定超时时间

写操作:延迟双删(还是有脏数据的风险)

在写操作中就涉及到下列几个问题

1.先删除缓存,还是先修改数据库?

先删除缓存,再操作数据库

假设数据库中原来的v=20,需要更新为v=10,在高并发的场景下如果出现上图的情况,则会出现缓存和数据库的数据不一致的问题。

先操作数据库,再删除缓存

假设数据库中原来的v=10,需要更新为v=20,在高并发的场景下如果出现上图的情况,则会出现缓存和数据库的数据不一致的问题。

所以说以上述的两种方案都有数据不一致的风险

2.为什么要删除两次缓存?

为了降低第一个问题中提到的脏数据出现的风险,我们可以采用在数据库更新完了之后再删除一次缓存。

3.为什么要延时删除?

由于数据库是一个主从集群,数据同步需要一定的时间,所以删除缓存要延迟一段时间,但是这个延迟时间不好控制。所以最终得出结论:这种方案存在一定漏洞,所以我们如果要想保证强一致性,可以采用互斥锁的方案

互斥锁(读写锁)

为了提高性能可以采用读写锁。

共享锁:读锁readLock,加锁之后,其他线程可以共享读操作 ? ? ?

排他锁:独占锁writeLock也叫,加锁之后,阻塞其他线程读写操作

代码实现

加共享锁

public Item getById(Integer id){
    RReadWriteLock readWriteLock = redissonClient. getReadWriteLock( s: "ITEM_ _READ_ WRITE_ LOCK") ;
//读之前加读锁,读锁的作用就是等待该Lockkey释放写锁以后再读
RLock readLock = readWriteLock. readLock();
   try {
//开锁
readLock.lock();
System . out . printLn("readLock...");
Item item = (Item) redisTemplate. opsForValue() .get("item:"+id);
if(item != nuLl){
return item ;|
//查询业务数据
item = new Item(id, name: "华为手机",desc: "华为手机",price: 5999.00);
//写入缓存.
redisTemplate. opsForValue() .set("item:"+id, item) ;
//返回数据
return item;
} finally {
readLock. unLock() ;
}
}

加排他锁

public void updateById(Integer id){
RReadWriteLock readWriteLock = redissonClient. getReadWriteLock( s: "ITEM_ READ. _WRITE_ LOCK");
//写之前加写锁,写锁加锁成功,读锁只能等待
RLock writeLock = readWriteLock.writeLock();
try {
//开锁
writeLock.lock();
System.out.printLn("writeLock...");
//.更新业务数据
Item item = new Item(id, name: "华为手机", desc: "华为手机",price: 5299 .00);
try {
Thread.sleep( millis: 10000);
}catch(InterruptedException e) {
e.printStackTrace() ;
}
//删除缓存
redisTemplate.delete( key: "item:"+id) ;
} finally {
writeLock. unLock();
}
}

缺点:虽然加锁后能保证缓存和数据库的强一致性,但加锁后也会导致性能降低,影响访问速度。因此该方案适合于读多写少的业务场景

要求性能高(允许延迟一致)解决方案
异步通知

思路如下图

以下是通过消息队列进行消息的异步通知到redis中进行更新

以下是基于Canal的异步通知

Canal通过监听MySQL数据库的数据变化,然后将数据变更情况异步传递给缓存服务。

以上的方案各有利弊,强一致性和性能高(允许延迟一致)只能从中权衡,要根据自身的业务场景进行合理选择。没有十全十美的解决方法,只有适合自己的方法。

redis持久化

RDB

介绍

RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。

Redis内部有触发RDB的机制,可以在redis.conf文件中找到,格式如下:

RDB的执行原理

bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据。完成fork后读取内存数据并写入 RDB 文件。 fork采用的是copy-on-write技术:

  • 当主进程执行读操作时,访问共享内存;
  • 当主进程执行写操作时,则会拷贝一份数据,执行写操作。

AOF

介绍

AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件。

开启AOF

AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:

AOF的命令记录的频率也可以通过redis.conf文件来配:

不同的命令记录频率性能不一样

因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果。

Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:

RDB与AOF对比

RDB和AOF各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用。

数据删除策略

惰性删除

设置该key过期时间后,我们不去管它,当需要该key时,我们在检查其是否过期,如果过期,我们就删掉它,反之返回该key。

优点 :对CPU友好,只会在使用该key时才会进行过期检查,对于很多用不到的key不用浪费时间进行过期检查

缺点 :对内存不友好,如果一个key已经过期,但是一直没有使用,那么该key就会一直存在内存中,内存永远不会释放

定期删除

每隔一段时间,我们就对一些key进行检查,删除里面过期的key(从一定数量的数据库中取出一定数量的随机key进行检查,并删除其中的过期key)。

定期清理的模式

  • SLOW模式是定时任务,执行频率默认为10hz,每次不超过25ms,以通过修改配置文件redis.conf 的hz 选项来调整这个次数
  • FAST模式执行频率不固定,但两次间隔不低于2ms,每次耗时不超过1ms

优点:可以通过限制删除操作执行的时长和频率来减少删除操作对 CPU 的影响。另外定期删除,也能有效释放过期键占用的内存。

缺点:难以确定删除操作执行的时长和频率。

Redis的过期删除策略:惰性删除 + 定期删除两种策略进行配合使用

数据淘汰策略

当Redis中的内存不够用时,此时在向Redis中添加新的key,那么Redis就会按照某一种规则将内存中的数据删除掉,这种数据的删除规则被称之为内存的淘汰策略。

Redis支持8种不同策略来选择要删除的key:

  • noeviction: 不淘汰任何key,但是内存满时不允许写入新数据,默认就是这种策略
  • volatile-ttl: 对设置了TTL的key,比较key的剩余TTL值,TTL越小越先被淘汰。
  • allkeys-random:对全体key ,随机进行淘汰。
  • volatile-random:对设置了TTL的key ,随机进行淘汰。
  • allkeys-lru: 对全体key,基于LRU算法进行淘汰
  • volatile-lru: 对设置了TTL的key,基于LRU算法进行淘汰
  • allkeys-lfu: 对全体key,基于LFU算法进行淘汰
  • volatile-lfu: 对设置了TTL的key,基于LFU算法进行淘汰
使用策略

要结合业务场景进行使用

  • 优先使用 allkeys-lru 策略。充分利用 LRU 算法的优势,把最近最常访问的数据留在缓存中。如果业务有明显的冷热数据区分,建议使用
  • 如果业务中数据访问频率差别不大,没有明显冷热数据区分,建议使用 allkeys-random,随机选择淘汰。
  • 如果业务中有置顶的需求,可以使用 volatile-lru 策略,同时置顶数据不设置过期时间,这些数据就一直不被删除,会淘汰其他设置过期时间的数据。
  • 如果业务中有短时高频访问的数据,可以使用 allkeys-lfu 或 volatile-lfu 策略。

redis分布式锁

使用场景

例子:抢购优惠卷

流程图

为了保证不出现优惠卷超卖的情况,我们想到可以用加锁来确保优惠卷不超卖。如果后端是一个单体服务,只部署在一个端口上,也就是只有一个tomcat服务,在这种情况下只需要加本地锁就可以了。但是如果后端是集群服务,那么就要用分布式锁了。redis就可以用来实现分布式锁。

?实现分布式锁的原理

Redis实现分布式锁主要利用Redis的setnx命令。setnx是SET if not exists(如果不存在,则 SET)的简写。

  • 获取锁

???????

  • 释放锁

实现方案:redisson

执行流程:

public void redisLock() throws InterruptedException {
//获取锁(重入锁),执行锁的书称
RLock Lock = redissonClient.getLock( s: "KJZLock");
try {
//尝试获取锁,参数分别是:获软锁的最大等待时间(期间会重试),锁白动释放时间,时间单位
//boolean isLock = Zock. tryLock(10, 3日,TimeUnit. SECONDS);
boolean isLock = Lock. tryLock( time: 10,TimeUnit . SECONDS);
//判湖是否获软成功
if(isLock){
System. out . printLn("执行业务...");
} finally {
//籽放馈
Lock. unLock();
      }
}
可重入性

redission实现的分布式锁是可重入

public void add1(){
 RLock lock = redissonClient.getLock(“kjzlock");
//重入次数加一
 boolean isLock = lock.tryLock();
 //执行业务
add2();
//重入次数减一
//释放锁
lock.unlock();
}
public void add2(){
 RLock lock = redissonClient.getLock(“kjzlock");
//重入次数加一
boolean isLock = lock.tryLock();
//执行业务  
//释放锁
//重入次数减一
lock.unlock();
}

redis利用hash结构记录线程id和重入次数

主从一致性

当redis是一个集群时,主节点用来增删改,从节点用来查,当一个用户在主节点上获取到一个分布式锁时,这时候主节点挂了。

这时候redis集群会在从节点中挑选一个节点成为主节点,这时候又有一个用户发来相同请求时则会获取到同一个分布式锁,当原来的主节点恢复了之后,就会出现两个线程同时持有一把锁,这就不符合锁的互斥性了。

redission中提供了RedLock(红锁)来解决上述问题

RedLock(红锁):不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁(n / 2 + 1),避免在一个redis实例上加锁。

但是这么做就使性能就大打折扣了,如果业务非要保证数据的强一致性,建议采用zookeeper实现的分布式锁

redis集群

集群方案

主从复制

单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离

主从数据同步原理

主从全量同步

Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid。

offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。

主从增量同步(slave重启或后期数据变化)

???????

全量同步流程

1.从节点请求主节点同步数据(replication id、 offset )

2.主节点判断是否是第一次请求,是第一次就与从节点同步版本信息(replication id和offset)

3.主节点执行bgsave,生成rdb文件后,发送给从节点去执行

4.在rdb生成执行期间,主节点会以命令的方式记录到缓冲区(一个日志文件)

5.把生成之后的命令日志文件发送给从节点进行同步

增量同步流程

1.从节点请求主节点同步数据,主节点判断不是第一次请求,不是第一次就获取从节点的offset值

2.主节点从命令日志中获取offset值之后的数据,发送给从节点进行数据同步

采用主从模式部署redis集群,还是无法保证不了redis的高可用。如果主节点宕机了,那么就丧失了写数据的能力,因此我们还需要引入一种新的集群模式:哨兵模式

Sentinel(哨兵)模式
哨兵的基本作用

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵的结构和作用如下:

  • 监控:Sentinel 会不断检查您的master和slave是否按预期工作
  • 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
  • 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端

下面详细说明一下这些作用的细节:

服务状态监控

Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:

  • 主观下线:如果某sentinel节点发现某实例未在规定时间响应,则认为该实例主观下线。
  • 客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半。
哨兵选主规则
  • 首先判断主与从节点断开时间长短,如超过指定值就排该从节点。
  • 然后判断从节点的slave-priority值,越小优先级越高。
  • 如果slave-prority一样,则判断slave节点的offset值,越大优先级越高。
  • 最后是判断slave节点的运行id大小,越小优先级越高。
可能出现的问题:脑裂

什么是脑裂:

由于redis master节点和redis salve节点和sentinel处于不同的网络分区,使得sentinel没有能够心跳感知到master,所以通过选举的方式提升了一个salve为master,这样就存在了两个master,就像大脑分裂了一样,这样会导致客户端还在old master那里写入数据,新节点无法同步数据,当网络恢复后,sentinel会将old master降为salve,这时再从新master同步数据,这会导致old master中的大量数据丢失。

解决方案:

设置redis的配置:

  • 第一可以设置最少的salve节点个数,比如设置至少要有一个从节点才能同步数据(这样当发生脑裂时,由于旧主节点没有从节点,就不会往主节点中写入数据)。
  • 第二个可以设置主从数据复制和同步的延迟时间,达不到要求就拒绝请求,就可以避免大量的数据丢失。

主从和哨兵可以解决高可用???????、高并发读的问题。但是依然有两个问题没有解决: 海量数据存储问题和高并发写的问题,接下来引入分片集群来解决这些问题。

分片集群

分片集群结构
  • 集群中有多个master。
  • 每个master保存不同数据。
  • 每个master都可以有多个slave节点。
  • master之间通过ping监测彼此健康状态。
  • 客户端请求可以访问集群任意节点,最终都会被转发到正确节点。

数据读写

Redis 分片集群引入了哈希槽的概念,Redis 集群有 16384 个哈希槽,每个 key通过 CRC16 校验后对 16384 取模来决定放置哪个槽,集群的每个节点负责一部分 hash 槽。

redis是单线程的,为什么读写速度还这么快

1、完全基于内存的,C语言编写

2、采用单线程,避免不必要的上下文切换可竞争条件

3、使用多路I/O复用模型,非阻塞IO

知识准备


用户空间和内核空间

Linux系统中一个进程使用的内存情况划分两部分:内核空间、用户空间。用户空间只能执行受限的命令(Ring3),而且不能直接调用系统资源,必须通过内核提供的接口来访问。 内核空间可以执行特权命令(Ring0),调用一切系统资源。

Linux系统为了提高IO效率,会在用户空间和内核空间都加入缓冲区: 写数据时,要把用户缓冲数据拷贝到内核缓冲区,然后写入设备。读数据时,要从设备读取数据到内核缓冲区,然后拷贝到用户缓冲区。

阻塞IO

顾名思义,阻塞IO就是两个阶段都必须阻塞等待:

阶段一:

  • 用户进程尝试读取数据(比如网卡数据)
  • 此时数据尚未到达,内核需要等待数据
  • 此时用户进程也处于阻塞状态

阶段二:

  • 数据到达并拷贝到内核缓冲区,代表已就绪
  • 将内核数据拷贝到用户缓冲区
  • 拷贝过程中,用户进程依然阻塞等待
  • 拷贝完成,用户进程解除阻塞,处理数据

可以看到,阻塞IO模型中,用户进程在两个阶段都是阻塞状态。

非阻塞IO

可以看到,阻塞IO模型中,用户进程在两个阶段都是阻塞状态。

阶段一:

  • 用户进程尝试读取数据(比如网卡数据)
  • 此时数据尚未到达,内核需要等待数据
  • 返回异常给用户进程
  • 用户进程拿到error后,再次尝试读取
  • 循环往复,直到数据就绪

阶段二:

  • 将内核数据拷贝到用户缓冲区
  • 拷贝过程中,用户进程依然阻塞等待
  • 拷贝完成,用户进程解除阻塞,处理数据

可以看到,非阻塞IO模型中,用户进程在第一个阶段是非阻塞,第二个阶段是阻塞状态。虽然是非阻塞,但性能并没有得到提高。而且忙等机制会导致CPU空转,CPU使用率暴增。


IO多路复用

IO多路复用是利用单个线程来同时监听多个Socket ,并在某个Socket可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源。不过监听Socket的方式、通知的方式又有多种实现,常见的有:

  • select
  • poll
  • epoll

差异:

  • select和poll只会通知用户进程有Socket就绪,但不确定具体是哪个Socket ,需要用户进程逐个遍历Socket来确认
  • epoll则会在通知用户进程Socket就绪的同时,把已就绪的Socket写入用户空间

Redis网络模型

Redis通过IO多路复用来提高网络性能,并且支持各种不同的多路复用实现,并且将这些实现进行封装, 提供了统一的高性能事件库

  • 命令回复处理器,在Redis6.0之后,为了提升更好的性能,使用了多线程来处理回复事件
  • 命令请求处理器,在Redis6.0之后,将命令的转换使用了多线程,增加命令转换速度,在命令执行的时候,依然是单线程

面试问题示例

面试官:什么是缓存穿透 ? 怎么解决 ?

候选人

缓存穿透是指查询一个一定不存在的数据,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到 DB 去查询,可能导致 DB 挂掉。这种情况大概率是遭到了攻击。

解决方案的话,我们通常都会用布隆过滤器来解决它

面试官:好的,你能介绍一下布隆过滤器吗?

候选人

布隆过滤器主要是用于检索一个元素是否在一个集合中。我们当时使用的是redisson实现的布隆过滤器。

它的底层主要是先去初始化一个比较大数组,里面存放的二进制0或1。在一开始都是0,当一个key来了之后经过3次hash计算,模于数组长度找到数据的下标然后把数组中原来的0改为1,这样的话,三个数组的位置就能标明一个key的存在。查找的过程也是一样的。

当然是有缺点的,布隆过滤器有可能会产生一定的误判,我们一般可以设置这个误判率,大概不会超过5%,其实这个误判是必然存在的,要不就得增加数组的长度,其实已经算是很划分了,5%以内的误判率一般的项目也能接受,不至于高并发下压倒数据库。

面试官:什么是缓存击穿 ? 怎么解决 ?

候选人

缓存击穿的意思是对于设置了过期时间的key,缓存在某个时间点过期的时候,恰好这时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把 DB 压垮。

解决方案有两种方式:

第一可以使用互斥锁:当缓存失效时,不立即去load db,先使用如 Redis 的 setnx 去设置一个互斥锁,当操作成功返回时再进行 load db的操作并回设缓存,否则重试get缓存的方法

第二种方案可以设置当前key逻辑过期,大概是思路如下:

①:在设置key的时候,设置一个过期时间字段一块存入缓存中,不给当前key设置过期时间

②:当查询的时候,从redis取出数据后判断时间是否过期

③:如果过期则开通另外一个线程进行数据同步,当前线程正常返回数据,这个数据不是最新

当然两种方案各有利弊:

如果选择数据的强一致性,建议使用分布式锁的方案,性能上可能没那么高,锁需要等,也有可能产生死锁的问题

如果选择key的逻辑删除,则优先考虑的高可用性,性能比较高,但是数据同步这块做不到强一致。

面试官:什么是缓存雪崩 ? 怎么解决 ?

候选人

缓存雪崩意思是设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB 瞬时压力过重雪崩。与缓存击穿的区别:雪崩是很多key,击穿是某一个key缓存。

解决方案主要是可以将缓存失效时间分散开,比如可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

面试官:redis做为缓存,mysql的数据如何与redis进行同步呢?(双写一致性)

候选人:就说我最近做的这个项目,里面有xxxx(根据自己的简历上写)的功能,需要让数据库与redis高度保持一致,因为要求时效性比较高,我们当时采用的读写锁保证的强一致性。

我们采用的是redisson实现的读写锁,在读的时候添加共享锁,可以保证读读不互斥,读写互斥。当我们更新数据的时候,添加排他锁,它是读写,读读都互斥,这样就能保证在写数据的同时是不会让其他线程读数据的,避免了脏数据。这里面需要注意的是读方法和写方法上需要使用同一把锁才行。

面试官:那这个排他锁是如何保证读写、读读互斥的呢?

候选人:其实排他锁底层使用也是setnx,保证了同时只能有一个线程操作锁住的方法

面试官:你听说过延时双删吗?为什么不用它呢?

候选人:延迟双删,如果是写操作,我们先把缓存中的数据删除,然后更新数据库,最后再延时删除缓存中的数据,其中这个延时多久不太好确定,在延时的过程中可能会出现脏数据,并不能保证强一致性,所以没有采用它。

面试官:redis做为缓存,mysql的数据如何与redis进行同步呢?(双写一致性)

候选人:就说我最近做的这个项目,里面有xxxx(根据自己的简历上写)的功能,数据同步可以有一定的延时(符合大部分业务)

我们当时采用的阿里的canal组件实现数据同步:不需要更改业务代码,部署一个canal服务。canal服务把自己伪装成mysql的一个从节点,当mysql数据更新以后,canal会读取binlog数据,然后在通过canal的客户端获取到数据,更新缓存即可。

面试官:redis做为缓存,数据的持久化是怎么做的?

候选人:在Redis中提供了两种数据持久化的方式:1、RDB 2、AOF

面试官:这两种持久化方式有什么区别呢?

候选人:RDB是一个快照文件,它是把redis内存存储的数据写到磁盘上,当redis实例宕机恢复数据的时候,方便从RDB的快照文件中恢复数据。

AOF的含义是追加文件,当redis操作写命令的时候,都会存储这个文件中,当redis实例宕机恢复数据的时候,会从这个文件中再次执行一遍命令来恢复数据

面试官:这两种方式,哪种恢复的比较快呢?

候选人:RDB因为是二进制文件,在保存的时候体积也是比较小的,它恢复的比较快,但是它有可能会丢数据,我们通常在项目中也会使用AOF来恢复数据,虽然AOF恢复的速度慢一些,但是它丢数据的风险要小很多,在AOF文件中可以设置刷盘策略,我们当时设置的就是每秒批量写入一次命令

面试官:Redis的数据过期策略有哪些 ?

候选人

在redis中提供了两种数据过期删除策略

第一种是惰性删除,在设置该key过期时间后,我们不去管它,当需要该key时,我们在检查其是否过期,如果过期,我们就删掉它,反之返回该key。

第二种是 定期删除,就是说每隔一段时间,我们就对一些key进行检查,删除里面过期的key

定期清理的两种模式:

  • SLOW模式是定时任务,执行频率默认为10hz,每次不超过25ms,以通过修改配置文件redis.conf 的 hz 选项来调整这个次数

  • FAST模式执行频率不固定,每次事件循环会尝试执行,但两次间隔不低于2ms,每次耗时不超过1ms

Redis的过期删除策略:惰性删除 + 定期删除两种策略进行配合使用。

面试官:Redis的数据淘汰策略有哪些 ?

候选人

这个在redis中提供了很多种,默认是noeviction,不删除任何数据,内部不足直接报错

是可以在redis的配置文件中进行设置的,里面有两个非常重要的概念,一个是LRU,另外一个是LFU

LRU的意思就是最少最近使用,用当前时间减去最后一次访问时间,这个值越大则淘汰优先级越高。

LFU的意思是最少频率使用。会统计每个key的访问频率,值越小淘汰优先级越高

我们在项目设置的allkeys-lru,挑选最近最少使用的数据淘汰,把一些经常访问的key留在redis中

面试官:数据库有1000万数据 ,Redis只能缓存20w数据, 如何保证Redis中的数据都是热点数据 ?

候选人

可以使用 allkeys-lru (挑选最近最少使用的数据淘汰)淘汰策略,那留下来的都是经常访问的热点数据

面试官:Redis的内存用完了会发生什么?

候选人

这个要看redis的数据淘汰策略是什么,如果是默认的配置,redis内存用完以后则直接报错。我们当时设置的 allkeys-lru 策略。把最近最常访问的数据留在缓存中。

面试官:Redis分布式锁如何实现 ?

候选人:在redis中提供了一个命令setnx(SET if not exists)

由于redis的单线程的,用了命令之后,只能有一个客户端对某一个key设置值,在没有过期或删除key的时候是其他客户端是不能设置这个key的

面试官:好的,那你如何控制Redis实现分布式锁有效时长呢?

候选人:嗯,的确,redis的setnx指令不好控制这个问题,我们当时采用的redis的一个框架redisson实现的。

在redisson中需要手动加锁,并且可以控制锁的失效时间和等待时间,当锁住的一个业务还没有执行完成的时候,在redisson中引入了一个看门狗机制,就是说每隔一段时间就检查当前业务是否还持有锁,如果持有就增加加锁的持有时间,当业务执行完成之后需要使用释放锁就可以了

还有一个好处就是,在高并发下,一个业务有可能会执行很快,先客户1持有锁的时候,客户2来了以后并不会马上拒绝,它会自旋不断尝试获取锁,如果客户1释放之后,客户2就可以马上持有锁,性能也得到了提升。

面试官:好的,redisson实现的分布式锁是可重入的吗?

候选人:是可以重入的。这样做是为了避免死锁的产生。这个重入其实在内部就是判断是否是当前线程持有的锁,如果是当前线程持有的锁就会计数,如果释放锁就会在计算上减一。在存储数据的时候采用的hash结构,大key可以按照自己的业务进行定制,其中小key是当前线程的唯一标识,value是当前线程重入的次数

面试官:redisson实现的分布式锁能解决主从一致性的问题吗

候选人:这个是不能的,比如,当线程1加锁成功后,master节点数据会异步复制到slave节点,此时当前持有Redis锁的master节点宕机,slave节点被提升为新的master节点,假如现在来了一个线程2,再次加锁,会在新的master节点上加锁成功,这个时候就会出现两个节点同时持有一把锁的问题。

我们可以利用redisson提供的红锁来解决这个问题,它的主要作用是,不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,并且要求在大多数redis节点上都成功创建锁,红锁中要求是redis的节点数量要过半。这样就能避免线程1加锁成功后master节点宕机导致线程2成功加锁到新的master节点上的问题了。

但是,如果使用了红锁,因为需要同时在多个节点上都添加锁,性能就变的很低了,并且运维维护成本也非常高,所以,我们一般在项目中也不会直接使用红锁,并且官方也暂时废弃了这个红锁

面试官:好的,如果业务非要保证数据的强一致性,这个该怎么解决呢?

候选人:redis本身就是支持高可用的,做到强一致性,就非常影响性能,所以,如果有强一致性要求高的业务,建议使用zookeeper实现的分布式锁,它是可以保证强一致性的。

面试官:Redis集群有哪些方案, 知道嘛 ?

候选人:在Redis中提供的集群方案总共有三种:主从复制、哨兵模式、Redis分片集群

面试官:那你来介绍一下主从同步

候选人:单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,可以搭建主从集群,实现读写分离。一般都是一主多从,主节点负责写数据,从节点负责读数据,主节点写入数据之后,需要把数据同步到从节点中

面试官:能说一下,主从同步数据的流程

候选人:主从同步分为了两个阶段,一个是全量同步,一个是增量同步

全量同步是指从节点第一次与主节点建立连接的时候使用全量同步,流程是这样的:

第一:从节点请求主节点同步数据,其中从节点会携带自己的replication id和offset偏移量。

第二:主节点判断是否是第一次请求,主要判断的依据就是,主节点与从节点是否是同一个replication id,如果不是,就说明是第一次同步,那主节点就会把自己的replication id和offset发送给从节点,让从节点与主节点的信息保持一致。

第三:在同时主节点会执行bgsave,生成rdb文件后,发送给从节点去执行,从节点先把自己的数据清空,然后执行主节点发送过来的rdb文件,这样就保持了一致

当然,如果在rdb生成执行期间,依然有请求到了主节点,而主节点会以命令的方式记录到缓冲区,缓冲区是一个日志文件,最后把这个日志文件发送给从节点,这样就能保证主节点与从节点完全一致了,后期再同步数据的时候,都是依赖于这个日志文件,这个就是全量同步

增量同步指的是,当从节点服务重启之后,数据就不一致了,所以这个时候,从节点会请求主节点同步数据,主节点还是判断不是第一次请求,不是第一次就获取从节点的offset值,然后主节点从命令日志中获取offset值之后的数据,发送给从节点进行数据同步

面试官:怎么保证Redis的高并发高可用

候选人:首先可以搭建主从集群,再加上使用redis中的哨兵模式,哨兵模式可以实现主从集群的自动故障恢复,里面就包含了对主从服务的监控、自动故障恢复、通知;如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主;同时Sentinel也充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端,所以一般项目都会采用哨兵的模式来保证redis的高并发高可用

面试官:你们使用redis是单点还是集群,哪种集群

候选人:我们当时使用的是主从(1主1从)加哨兵。一般单节点不超过10G内存,如果Redis内存不足则可以给不同服务分配独立的Redis主从节点。尽量不做分片集群。因为集群维护起来比较麻烦,并且集群之间的心跳检测和数据通信会消耗大量的网络带宽,也没有办法使用lua脚本和事务

面试官:redis集群脑裂,该怎么解决呢?

候选人:这个在项目很少见,不过脑裂的问题是这样的,我们现在用的是redis的哨兵模式集群的

有的时候由于网络等原因可能会出现脑裂的情况,就是说,由于redis master节点和redis salve节点和sentinel处于不同的网络分区,使得sentinel没有能够心跳感知到master,所以通过选举的方式提升了一个salve为master,这样就存在了两个master,就像大脑分裂了一样,这样会导致客户端还在old master那里写入数据,新节点无法同步数据,当网络恢复后,sentinel会将old master降为salve,这时再从新master同步数据,这会导致old master中的大量数据丢失。

关于解决的话,我记得在redis的配置中可以设置:第一可以设置最少的salve节点个数,比如设置至少要有一个从节点才能同步数据,第二个可以设置主从数据复制和同步的延迟时间,达不到要求就拒绝请求,就可以避免大量的数据丢失

面试官:redis的分片集群有什么作用

候选人:分片集群主要解决的是,海量数据存储的问题,集群中有多个master,每个master保存不同数据,并且还可以给每个master设置多个slave节点,就可以继续增大集群的高并发能力。同时每个master之间通过ping监测彼此健康状态,就类似于哨兵模式了。当客户端请求可以访问集群任意节点,最终都会被转发到正确节点

面试官:Redis分片集群中数据是怎么存储和读取的?

候选人

在redis集群中是这样的

Redis 集群引入了哈希槽的概念,有 16384 个哈希槽,集群中每个主节点绑定了一定范围的哈希槽范围, key通过 CRC16 校验后对 16384 取模来决定放置哪个槽,通过槽找到对应的节点进行存储。

取值的逻辑是一样的

面试官:Redis是单线程的,但是为什么还那么快?

候选人

这个有几个原因吧

1、完全基于内存的,C语言编写

2、采用单线程,避免不必要的上下文切换可竞争条件

3、使用多路I/O复用模型,非阻塞IO

例如:bgsave 和 bgrewriteaof 都是在后台执行操作,不影响主线程的正常使用,不会产生阻塞

面试官:能解释一下I/O多路复用模型?

候选人:嗯~~,I/O多路复用是指利用单个线程来同时监听多个Socket ,并在某个Socket可读、可写时得到通知,从而避免无效的等待,充分利用CPU资源。目前的I/O多路复用都是采用的epoll模式实现,它会在通知用户进程Socket就绪的同时,把已就绪的Socket写入用户空间,不需要挨个遍历Socket来判断是否就绪,提升了性能。

其中Redis的网络模型就是使用I/O多路复用结合事件的处理器来应对多个Socket请求,比如,提供了连接应答处理器、命令回复处理器,命令请求处理器;

在Redis6.0之后,为了提升更好的性能,在命令回复处理器使用了多线程来处理回复事件,在命令请求处理器中,将命令的转换使用了多线程,增加命令转换速度,在命令执行的时候,依然是单线程

文章来源:https://blog.csdn.net/m0_74229735/article/details/135354287
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。