Redis缓存穿透/击穿/雪崩以及数据一致性的解决方案

发布时间:2023年12月22日

之前其实在网上也会有很多关于这个的一些讲解或者分析,这里是自己对这方面的一些想法和解决方案的提供。这几个问题,其实不管是在面试或者实际的开发工作中都会有很大的用处,所以在这里对这方面进行一个系统性的分析。

有关redis相关的,难免会涉及到四个特殊场景:缓存穿透、缓存雪崩、缓存击穿以及数据一致性。如果在开发中不注意这些场景的话,在高并发场景下有可能会导致系统崩溃,数据错乱等情况

缓存穿透?

缓存穿透是指查询缓存和数据库中都不存在的数据,导致所有的查询压力全部给到了数据库?

?比如查询一篇文章信息并对其进行缓存,一般的逻辑是先查询缓存中是否存在该文章,如果存在则直接返回,否则再查询数据库并将查询结果进行缓存。但是现在的情况就是缓存和数据库如果都不存在,这会导致每一次的请求都会一直重复查询缓存和数据库,导致压力瞬间飙升。

@GetMapping("/doc/queryById")
public Result<DocumentInfo> queryById(@RequestParam(name = "docId") Integer docId) {
    return Result.success(documentInfoService.getDocumentDetail(docId));
}


@Slf4j
@Service
public class DocumentInfoServiceImpl extends ServiceImpl<DocumentInfoMapper, DocumentInfo> implements DocumentInfoService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public DocumentInfo getDocumentDetail(int docId) {
        String redisKey = "doc::info::" + docId;
        String obj = stringRedisTemplate.opsForValue().get(redisKey);
        DocumentInfo documentInfo = null;
        if (StrUtil.isNotEmpty(obj)) { //缓存命中
            log.info("==== select from cache ====");
            documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
        } else {
            log.info("==== select from db ====");
            documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
            if (ObjectUtil.isNotNull(documentInfo)) { // 缓存结果
                stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
            }
        }
        return documentInfo;
    }
}

如果项目的并发量不大,这样写的话几乎没啥问题。如果项目的并发量很大,那么这就存在一个隐藏问题,如果在访问了一个不存在的文章(其实正常情况业务系统也不会出现这种问题,主要是是防止恶意攻击或者脚本调用接口),那么就会导致所有的请求全部需要到数据库中进行查询,从而给数据库造成压力,甚至造成宕机。

解决方案一:缓存空对象?

针对缓存穿透问题缓存空对象可以有效避免所产生的影响,当查询一条不存在的数据时,在缓存中存储一个空对象并设置一个过期时间(设置过期时间是为了避免出现数据库中存在了数据但是缓存中仍然是空数据现象),这样可以避免所有请求全部查询数据库的情况。这个方案也是很多人会进行选择的一种方式,用途其实也还是可以的,简单高效

// 查询对象不存在
        if(StrUtil.equals(obj,"")){
            log.info("==== select from cache , data not available ====");
            return null;
        }
        if (StrUtil.isNotEmpty(obj)) {
            log.info("==== select from cache ====");
            documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
        } else {
            log.info("==== select from db ====");
            documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
            //如果数据不存在,则缓存一个空对象并设置过期时间
            stringRedisTemplate.opsForValue().set(redisKey, ObjectUtil.isNotNull(documentInfo)?JSONUtil.toJsonStr(documentInfo):"", 5L, TimeUnit.SECONDS);
//            if (ObjectUtil.isNotNull(documentInfo)) {
//                stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
//            }
        }

??解决方案二:布隆过滤器

缓存空对象的缺点在于无论数据存不存在都需要查询一次数据库,并且redis中存储了大量的空数据,这个时候可以采用布隆过滤器来解决。

布隆过滤器可以简单的理解为由一个很长的二进制数组结合n个hash算法计算出n个数组下标,将这些数据下标置为1。在查找数据时,再次通过n个hash算法计算出数组下标,如果这些下标的值为1,表示该值可能存在(存在hash冲突的原因),如果为0,则表示该值一定不存在。?(关于布隆过滤器的原理可以自行百度一下)

  • 布隆过滤器的实现

在使用布隆过滤器时有两个核心参数,分别是预估的数据量size以及期望的误判率fpp,这两个参数我们可以根据自己的业务场景和数据量进行自主设置。在实现布隆过滤器时,有两个核心问题,分别是hash函数的选取个数n以及确定bit数组的大小len。?

单机版布隆过滤器?

目前单机版的布隆过滤器实现方式有很多,比如Guava提供的BloomFilter,Hutool工具包中提供的BitMapBloomFilter等。以Guava为例,需要引入对应的依赖包,在BloomFilter类中提供了create方法来进行布隆过滤器的创建。?

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>21.0</version>
</dependency>

?这里的数量和误判率根据自己需要,0.01仅为参考

public static BloomFilter<Integer> localBloomFilter =  BloomFilter.create(Funnels.integerFunnel(),10000L,0.01);

创建容器完成后,将需要筛选的数据同步到过滤器中。(也可以称为数据预热)

/**
 * 单机版布隆过滤器数据初始化
 */
@PostConstruct
public void initDocumentDataLocal(){
    List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();
    if(CollUtil.isNotEmpty(documentInfos)){
        documentInfos.stream().map(DocumentInfo::getId).forEach(e->{
            BloomFilterUtil.localBloomFilter.put(e);
        });
    }
}

在业务代码中,可以直接调用BloomFilter提供的mightContain方法,判断目标docId是否可能存在于过滤器中,如果可能存在,那么继续向下执行业务逻辑,否则直接中断执行。

@Override
public DocumentInfo getDocumentDetail(int docId) {
    //布隆过滤器拦截 
    boolean mightContain = BloomFilterUtil.localBloomFilter.mightContain(docId);
    if(!mightContain){ //是否有可能存在于布隆过滤器中
        log.info("==== select from bloomFilter , data not available ====");
        return null;
    }
    String redisKey = "doc::info::" + docId;
    String obj = stringRedisTemplate.opsForValue().get(redisKey);
    DocumentInfo documentInfo = null;
    if (StrUtil.isNotEmpty(obj)) {
        log.info("==== select from cache ====");
        documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
    } else {
        log.info("==== select from db ====");
        documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
        if(ObjectUtil.isNotNull(documentInfo)){
                stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);   
            }
    }
    return documentInfo;
}

?自定义分布式版布隆过滤器

如果是机器会部署多个几点,或者几个服务之间需要共享缓存数据,那就需要借助redis来进行统一管理。分布式布隆过滤器的存储依赖于redis的bitmap数据结构来实现,另外还需要定义四个参数,分别为预估数据量size,误判率fpp,数组大小bitNum以及hash函数个数hashNum其中预估数据量和误判率需要配置在yml文件中?

 @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Value("${bloom.filter.size}")
    private long size; // 预估数据量

    @Value("${bloom.filter.fpp}")
    private double fpp;  // 误判率

    private long bitNum; //数组大小len

    private int hashNum; // hash函数个数size

   /*
    * 计算出数组长度,hash函数个数并初始化数组
    */
    @PostConstruct
    private void initBloom() {
        this.bitNum = getNumOfBits(size, fpp);
        this.hashNum = getNumOfHashFun(size, bitNum);
        //借助redis的bitmap来实现二进制数组
        stringRedisTemplate.opsForValue().setBit("bloom::filter", bitNum, false);
    }
    
    /**
     * 计算bit数组大小
     *
     * @param size
     * @param fpp
     * @return
     */
    private long getNumOfBits(long size, double fpp) {
        return (long) (-size * Math.log(fpp) / (Math.log(2) * Math.log(2)));
    }

    /**
     * 计算所需的hash个数
     *
     * @param size
     * @param numOfBits
     * @return
     */
    private int getNumOfHashFun(long size, long numOfBits) {
        return Math.max(1, (int) Math.round((double) numOfBits / size * Math.log(2)));
    }

?计算出相应的数组长度以及所需的hash函数个数,并在redis设置一个布隆过滤器的key。

另外,需要提供两个方法,分别为添加元素的putBloomFilterRedis方法和判断元素是否有可能存在的方法existBloomFilterRedis,其中的实现方式参考了guava。?

/**
     * 自定义布隆过滤器中添加元素
     * @param key
     */
    public void putBloomFilterRedis(String key) {
        long hash64 = HashUtil.metroHash64(key.getBytes());
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        for (int i = 1; i <= hashNum; i++) {
            int combinedHash = hash1 + i * hash2;
            if (combinedHash < 0) {
               //如果为负数,则取反(保证结果为正数)
                combinedHash = ~combinedHash;
            }
            // 计算出数组下标,并将下标值置为1
            int bitIdx = (int) (combinedHash % bitNum);
            stringRedisTemplate.opsForValue().setBit("bloom::filter", bitIdx, true);
        }
    }

    /**
     * 判断自定义布隆过滤器中元素是否有可能存在
     * @param key
     * @return
     */
    public boolean existBloomFilterRedis(String key) {
        long hash64 = HashUtil.metroHash64(key.getBytes());
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        for (int i = 1; i <= hashNum; i++) {
            int combinedHash = hash1 + i * hash2;
            if (combinedHash < 0) {
                combinedHash = ~combinedHash;
            }
            int bitIdx = (int) (combinedHash % bitNum);
            //判断下标值是否为1,如果不为1直接返回false
            Boolean bit = stringRedisTemplate.opsForValue().getBit("bloom::filter", bitIdx);
            if (!bit) {
                return false;
            }
        }
        return true;
    }

?方法实现后,将所有的key值数据同步到redis中。

@Component
public class BloomFilterInitData {

    @Resource
    private BloomFilterUtil bloomFilterUtil;

    @Resource
    private DocumentInfoService documentInfoService;

    @PostConstruct
    public void initDocumentData(){
        List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();
        if(CollUtil.isNotEmpty(documentInfos)){
            documentInfos.stream().map(m -> {
                return "doc::info::" + m.getId().intValue();
            }).forEach(e->{
                bloomFilterUtil.putBloomFilterRedis(e);
            });
        }
    }
}

?

@Override
public DocumentInfo getDocumentDetail(int docId) {
    String redisKey = "doc::info::" + docId;
    // 布隆过滤器中是否有可能存在这个key
    boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);
    if(!b){
        // 如果不存在,直接返回空
        log.info("==== select from bloomFilter , data not available ====");
        return null;
    }
    String obj = stringRedisTemplate.opsForValue().get(redisKey);
    DocumentInfo documentInfo = null;
    if (StrUtil.isNotEmpty(obj)) {
        log.info("==== select from cache ====");
        documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
    } else {
        log.info("==== select from db ====");
        documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
        if(ObjectUtil.isNotNull(documentInfo)){
                stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);   
            }
    }
    return documentInfo;
}

后续不存在的数据成功被拦截掉了,避免再去查询数据库,即使存在一定的误判率,也几乎不会有啥影响,最多就是查询一次数据库。

虽然布隆过滤器可以有效的解决缓存穿透问题,并且实现的算法查找效率也很快。但是,也存在一定的缺点,由于存在hash冲突的原因,一方面存在一定的误判率(某个在过滤器中并不存在的key,但是通过hash计算出来的下标值都为1)。另一方面,删除比较困难(如果将一个数组位置为0,那么这个位置有可能也代表其他key的值,会影响到其他的key)?

缓存击穿?

缓存击穿是指访问某个热点数据时,缓存中并不存在该数据或者缓存过期了,这个时候全部的请求压力给到了数据库。?可以理解为击穿就是针对某一个点然后一直在请求,无效的请求数太多导致了数据库压力瞬间增加。

进行数据模拟如下:

public static void main(String[] args) throws InterruptedException {
/**
* 短时间内并发请求接口,并访问同一个数据
*/
    ExecutorService executorService = Executors.newFixedThreadPool(1000);
    CountDownLatch countDownLatch = new CountDownLatch(1000);
    for (int i = 0; i < 1000; i++) {
        executorService.execute(() -> {
            HttpResponse response = HttpUtil.createGet("http://127.0.0.1:8081/doc/queryById?docId=1").execute();
            System.out.println(response.body());
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    executorService.shutdown();
}

针对缓存击穿问题,有两种解决方案,一种是对热点数据不设置过期时间,另一种是采用互斥锁的方式。

?解决方案一:热点数据不设置过期时间

热点数据不设置过期时间,当后台更新热点数据数需要同步更新缓存中的数据,这种解决方式适用于不严格要求缓存一致性的场景。?(也可以参考数据库和redis的一致性相关性分析)

?解决方案二:使用互斥锁

如果是单机部署的环境下可以使用synchronized或lock来处理,保证同时只能有一个线程来查询数据库,其他线程可以等待数据缓存成功后在被唤醒,从而直接查询缓存即可。如果是分布式部署,可以采用分布式锁来实现互斥。?

@Component
public class RedisLockUtil {


    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 模拟互斥锁
     * @param key
     * @param value
     * @param exp
     * @return
     */
    public boolean tryLock(String key, String value, long exp) {
        Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent(key, value, exp, TimeUnit.SECONDS);
        if (absent) {
            return true;
        }
        return tryLock(key, value, exp); //如果线程没有获取锁,则在此处循环获取
    }

    /**
     * 释放锁
     * @param key
     * @param value
     */
    public void unLock(String key, String value) {
        String s = stringRedisTemplate.opsForValue().get(key);
        if (StrUtil.equals(s, value)) { //避免锁被其他线程误删
            stringRedisTemplate.delete(key);
        }
    }
}

?有了上面的两个方法,可以对业务代码进行改造,在查询数据库前进行加锁,读取完成后在释放锁。

@Override
public DocumentInfo getDocumentDetail(int docId) {
    String redisKey = "doc::info::" + docId;
    boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);
    if (!b) {
        log.info("==== select from bloomFilter , data not available ====");
        return null;
    }
    String obj = stringRedisTemplate.opsForValue().get(redisKey);
    DocumentInfo documentInfo = null;
    if (StrUtil.isNotEmpty(obj)) {
        log.info("==== select from cache ====");
        documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
    } else {
        String s = UUID.randomUUID().toString(); //给锁加个标识,避免误删
        String lockKey = redisKey+"::lock";
        boolean lock = redisLockUtil.tryLock(lockKey, s, 60); //尝试加锁
        if (lock) { 
            try {
               //如果加锁成功,先再次查询缓存,有可能上一个线程查询并添加到缓存了
                obj = stringRedisTemplate.opsForValue().get(redisKey);
                if (StrUtil.isNotEmpty(obj)) {
                    log.info("==== select from cache ====");
                    documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
                } else {
                    log.info("==== select from db ====");
                    documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
                    if (ObjectUtil.isNotNull(documentInfo)) {
                        stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
                    }
                }
            } finally {
                redisLockUtil.unLock(lockKey, s); //释放锁
            }
        }
    }
    return documentInfo;
}

?再次模拟并发查询,看看最终效果,理想的结果状态应该是查询一次数据库,后面的查询直接通过缓存获取

注意:但是这里也会有一个问题,就是假如在锁删除之前业务还未执行完毕,同样后续的请求来了之后也会出现问题。但是已经99%解决了这种情景。如果为了更加方便的情况下,可以使用redission的lock和unlock的方式更加方便且性能更好。

缓存雪崩

?缓存雪崩是指对热点数据设置了相同的过期时间,在同一时间这些热点数据key大批量发生过期,请求全部转发到数据库,从而导致数据库压力骤增,甚至宕机。与缓存击穿不同的是,缓存击穿是单个热点数据过期,而缓存雪崩是大批量热点数据过期。

针对缓存雪崩问题,常见的解决方案有多种,比如设置随机的过期时间或者不设置过期时间,搭建高可用的缓存架构避免redis服务宕机,服务降级等?

解决方案一:设置随机的过期时间?

if (ObjectUtil.isNotNull(documentInfo)) {
    //生成一个随机数
    int randomInt = RandomUtil.randomInt(2, 10); 
    //过期时间+随机数
    stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L+randomInt, TimeUnit.SECONDS);
}

?解决方案二:不设置过期时间

不设置过期时间时,需要注意的是,在更新数据库数据时,同时也需要更新缓存数据,否则数据会出现不一致的情况。这种方式比较适用于不严格要求缓存一致性的场景。?

文章来源:https://blog.csdn.net/jsl18328504657/article/details/135147607
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。