模拟缓存击穿并提供解决方案

发布时间:2024年01月21日

在聊到这个问题的时候我们首先要知道什么是缓存击穿,缓存击穿也叫做热点key问题,一个热点key在高并发的场景下突然失效了,此时很多的请求会在瞬间给数据库带来巨大冲击。数据库的压力就会很大,有挂掉的可能性。

现在我们来模拟一下缓存击穿的场景,现在缓存中有缓存用户1,将在51s后过期

我们用key的过期来模拟热点key失效的问题。

我们在 22s的时候用jmerter进行压测,模拟高并发场景下的情况。

测试代码如下:

package com.qjc.interview.Cache.penetration.controller;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.qjc.interview.Cache.penetration.pojo.User;
import com.qjc.interview.Cache.penetration.service.UserService;
import net.minidev.json.JSONValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: QuJingChuan
 * @Date: 2024/1/14 08:33
 * @Description:
 */
@RestController
@RequestMapping("/test")
public class PenetrationController {

    @Autowired
    private UserService userService;

    @Autowired
    private RedisTemplate<String,String> redisTemplate;
    @GetMapping("/queryById/{id}")
    public User queryUserById(@PathVariable("id") Integer id){
       /* //这个布隆过滤器用来存储integer类型的数据,初始化大小为1000.误判率在百分之5
        BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(),1000,0.05);
        //将表中的主键初始化进入布隆过滤器(因为我这里只有一条数据,我就直接加入进去了)
        bloomFilter.put(1);
        //判断布隆过滤器中是否有数据,没有直接返回.
        if (!bloomFilter.mightContain(id)){
            System.out.println("布隆过滤器中无数据,直接返回");
            return null;
        }*/
        //先从redis查询有无缓存对象
        String userStr = redisTemplate.opsForValue().get("缓存用户" + id);
        if (userStr != null){
            //redis缓存中有数据
            System.out.println("从redis缓存中获取用户");
            return JSONValue.parse(userStr,User.class);
        }
        User user = userService.SelectUserById(id);
        System.out.println("从数据库中获取用户");
        if (user ==null){
            return null;
        }
       /* //缓存重建
        redisTemplate.opsForValue().set("缓存用户"+id, JSONValue.toJSONString(user),60, TimeUnit.SECONDS);*/
        return user;
    }

}

测试结果如下:

很明显可以看到,当缓存没有过期的时候高并发的场景下是从redis缓存中获取数据的,但是当redis中的key过期后,剩下的大量请求打入了数据库,造成数据库压力过大。

常见的解决方案如下所示

1.设置互斥锁(分布式锁)

Redis中的setnx方法

在Redis中,SETNX是一个用于设置指定键的值的命令。它会在键不存在时设置键的值,并且如果键已经存在,则不会进行任何操作。这个命令通常用于在设置某个键的值时,确保该键不存在,以避免覆盖已有的值。

简单的说就是当高并发场景下当有一个线程发现了redis缓存中取不到数据后,这个线程会进行缓存重建(获取该对象的锁),在缓存重建的过程中仅仅有一个线程进行缓存重建,其他的线程将尝试进行缓存重建(但是由于没有拿到这个对象的锁),因为该对象锁被其中一个线程占用,因此其他的线程将会不断地休眠 ,并且重新查询缓存,若缓存没有重建完成继续尝试并且获取锁。

具体图示如下

具体的代码实现如下

package com.qjc.interview.Cache.penetration.controller;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.qjc.interview.Cache.penetration.pojo.User;
import com.qjc.interview.Cache.penetration.service.UserService;
import jakarta.annotation.Resource;
import net.minidev.json.JSONValue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.convert.RedisData;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: QuJingChuan
 * @Date: 2024/1/14 08:33
 * @Description:
 */
@RestController
@RequestMapping("/test")
public class PenetrationController {

    @Autowired
    private UserService userService;

    @Autowired
    private RedisTemplate<String,String> redisTemplate;


    @Autowired
    private RedissonClient redissonClient;

    @GetMapping("/queryById/{id}")
    public User queryUserById(@PathVariable("id") Integer id){
    /*    //这个布隆过滤器用来存储integer类型的数据,初始化大小为1000.误判率在百分之5
        BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(),1000,0.05);
        //将表中的主键初始化进入布隆过滤器(因为我这里只有一条数据,我就直接加入进去了)
        bloomFilter.put(1);
        //判断布隆过滤器中是否有数据,没有直接返回.
        if (!bloomFilter.mightContain(id)){
            System.out.println("布隆过滤器中无数据,直接返回");
            return null;
        }*/
        //先从redis查询有无缓存对象
        String userStr = redisTemplate.opsForValue().get("缓存用户" + id);
        if (userStr != null){
            //redis缓存中有数据
            System.out.println("从redis缓存中获取用户");
            return JSONValue.parse(userStr,User.class);
        }
        //TODO 分布式锁解决缓存击穿
        //获取互斥锁
        String lockKey = "lock" + id;
        try {
            boolean isLock = tryLock(lockKey);
            //获取锁是否成功
            if (!isLock){
                //失败,休眠重试
                System.out.println("没拿到锁,休眠后重试");
                Thread.sleep(50);
                //递归
                return queryUserById(id);
            }
            //有一个线程拿到id并且查询数据库
            System.out.println(Thread.currentThread().getName() + "拿到锁");
            User user = userService.SelectUserById(id);
            //数据库中没有
            if (user == null){
                //将空值写入redis
                redisTemplate.opsForValue().set("缓存用户" + id,null,10,TimeUnit.SECONDS);
                return  null;
            }
            //重建缓存
            redisTemplate.opsForValue().set("缓存用户" + id,JSONValue.toJSONString(user),30,TimeUnit.SECONDS);
        }catch (Exception e){
            throw new RuntimeException();
        }
        //释放锁
        unlock(lockKey);
        return JSONValue.parse(redisTemplate.opsForValue().get("缓存用户"+id),User.class);
    }

    //尝试获取锁的方法
    private boolean tryLock(String key){
        //setnx
        boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return flag;
    }

    //释放锁的代码
    private void unlock(String key){
        redisTemplate.delete(key);
    }
}

压力测试如下

我的缓存中现在没有数据,那就直接让线程新建数据目标key是 “缓存用户1”?
执行代码后结果如下
缓存中

等缓存的数据块过期时,我们此时发送大量的高并发数据进行测试

测试结果如下

可以清楚的看到,我们在1s中发了2000次请求的时候,仅仅有一个线程拿到了锁(http-nio-8888-exec-56线程),并且这个线程对缓存进行了重建。后续的所有线程都在redis缓存中取到了数据。

2.设置逻辑过期时间

在这里给各位小伙伴们解释一下什么事设置逻辑过期时间,我们在第一次将数据加入缓存中不给他设置过期时间,但是我们给他传入的时候设置一个逻辑过期时间。当我们每次取出这个缓存的时,我们用当前的时间和逻辑过期的时间来对比,看看这个缓存是否"过期"(实际是不会过期的)。

具体流程图如下:

这里和上述的分布式锁解决缓存击穿问题有明显的区别,很显然当一个线程发现时间过期后并且没有获取到锁进行缓存重建,但是这里却没有重试等待而是选择了将之前的值进行返回

代码如下

这里要单独封装一份redisData数据,因为redis中默认没有给逻辑过期,因此我们需要自行封装逻辑过期时间。
?

package com.qjc.interview.Cache.penetration.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

/**
 * @Auther: QuJingChuan
 * @Date: 2024/1/21 11:03
 * @Description:
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RedisData {
    //封装逻辑过期
    private LocalDateTime expireTime; //逻辑过期时间
    private String data; //用于存放数据
}
package com.qjc.interview.Cache.penetration.controller;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.qjc.interview.Cache.penetration.pojo.RedisData;
import com.qjc.interview.Cache.penetration.pojo.User;
import com.qjc.interview.Cache.penetration.service.UserService;
import jakarta.annotation.Resource;
import net.minidev.json.JSONValue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Auther: QuJingChuan
 * @Date: 2024/1/14 08:33
 * @Description:
 */
@RestController
@RequestMapping("/test")
public class PenetrationController {

    @Autowired
    private UserService userService;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;


    @GetMapping("/queryById/{id}")
    public User queryUserById(@PathVariable("id") Integer id) {
    
        //TODO 逻辑过期解决缓存击穿问题
        //从缓存中获取对象
        String cacheKey = "缓存用户" + id;
        //创建一个固定大小的线程池,方便进行缓存重建
        ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(1);
        String userJson = redisTemplate.opsForValue().get(cacheKey);
        if (userJson == null || "".equals(userJson)) {
            System.out.println("用户不存在");
            return null;
        }
        //命中缓存
        RedisData redisData = JSONValue.parse(userJson, RedisData.class);
        User user = JSONValue.parse(redisData.getData(),User.class);//转换为java对象
        //获取过期时间
        LocalDateTime expireTime = redisData.getExpireTime();
        //判断逻辑是否过期
        if (expireTime.isAfter(LocalDateTime.now())) {
            // 未过期逻辑 直接返回数据
            return user;
        }
        // 过期了,进行缓存重建 获取锁
        boolean isLock = tryLock("lock" + id);
        if (isLock) {
            // 获取锁成功 新开启一个线程进行缓存重建
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    //缓存重建
                    this.saveRedis(id,20L);
                } catch (Exception e) {
                    throw new RuntimeException();
                } finally {
                    //释放锁
                    unlock("lock" + id);
                }
            });
        }
        //关闭线程池
        CACHE_REBUILD_EXECUTOR.shutdown();
        //返回商铺信息
        return user;

      
    }

    //设置逻辑过期的方法 并进行缓存预热
    public void saveRedis(Integer id, Long expireTime) {
        //查询用户信息
        User user = userService.SelectUserById(id);
        //封装逻辑过期时间和用户数据
        RedisData redisData = new RedisData();
        redisData.setData(JSONValue.toJSONString(user));
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireTime));
        //将数据写入缓存
        redisTemplate.opsForValue().set("缓存用户" + id,JSONValue.toJSONString(redisData));
    }


    //尝试获取锁的方法
    private boolean tryLock(String key) {
        //setnx
        boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return flag;
    }

    //释放锁的代码
    private void unlock(String key) {
        redisTemplate.delete(key);
    }
}

总结

分布式锁和逻辑过期都是解决缓存击穿的常见解决方式,但是分布式锁保证了数据的高度一致性,但是由于不断地尝试获取锁,递归方法和休眠一段时间,会导致性能不及逻辑过期的性能好,逻辑过期保证了性能,但是没有保证数据的高度一致性(因为在缓存没有重建好及时逻辑过期了仍然返回之前的假数据)。大家根据具体业务进行甄别和使用即可。

文章来源:https://blog.csdn.net/qq_63945982/article/details/135594576
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。