最近在做大项目性能测试的工作,又发现了几个比较有意思的Bug,本期分享其中的一个,涉及Redis在并发场景下的应用
有意思的是,这个Bug因为没有代码语法错误,并发量少的情况下,下游的监控也不会出现报警,所以光靠功能测试是没有办法发现,只能通过压测(性能测试)或者下游的监控报警才能发现
我们先来看一段Go语言实现的代码,没有代码基础也没关系,先解释一下这段代码的意思,就是先获取(Get) Redis Key 的值,这个值只有true 或者 false 两种情况 ,如果是true 则直接返回,不执行后续代码逻辑,如果是 false 则 先设置(Set) Redis Key 的值为true,再执行后续代码逻辑
var flag = false
// 获取锁
redisCache := redis.NewCache()
if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil {
ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err)
}
// flag = true说明已经有实例在请求了, 直接返回
// 否则 设置redis锁
if flag {
ctx.Notice("request_user_size_flag is true, return")
return
} else {
ctx.Notice("request_user_size_flag is false, request im to flush room user size")
if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil {
ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err)
}
}
// ... 后续具体的业务代码逻辑,可忽略
然而单纯利用从Redis 当中获取一个Bool值,以此来充当互斥锁,这种实现方案,在同一时刻只有一个用户请求能满足需求,但是在并发场景会出现无法锁住的情况,如下图,在初始条件下,即Redis Key 还从来没有被Set时(Key不存在时),当3个用户同时从Redis 读取到的值均为False ,就有3个用户同时去Set Redis Key,并且走到后续的代码逻辑
所以并发场景下,“锁”失效了
"锁"失效了有什么影响,继续给出完整代码逻辑,这段代码其实是定时任务的一部分,在执行期间,会请求下游服务获得相关数据
在并发场景下,“锁”失效了会导致下游的服务压力上涨,假设下游只能抗50QPS,现在QPS 已经到5000了,严重情况下还会出现IO打满,CPU和内存打满,服务宕机等风险
package main
import"time"
var (
CrontabTime = 20// 每20s执行一次脚本
ScriptMaxRunTime = 150// 脚本最长运行时间150s
)
func SetUserSize(ctx *gin.Context) {
var flag = false
// 获取锁
redisCache := redis.NewCache()
if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil {
ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err)
}
// flag = true说明已经有实例在请求了, 直接返回
// 否则 设置redis锁
if flag {
ctx.Notice("request_user_size_flag is true, return")
return
} else {
ctx.Notice("request_user_size_flag is false, request im to flush room user size")
if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil {
ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err)
}
}
// 以下是定时任务的具体逻辑
ctx.NoticeF("run time start:%d", util.GetMilliSecond())
// 执行定时任务前,前置获取相关必要信息
info, err := GetInfo()
if err != nil {
ctx.WarningF("request info fail, error: %s", err)
// 获取信息失败,提前释放锁
_ = redisCache.Del(SetUserSizeRedisKey)
return
}
ctx.NoticeF("run time start req im:%d", util.GetMilliSecond())
// 定时任务具体逻辑
for _, people := range info {
//请求下游
// ...
}
ctx.NoticeF("run time end:%d", util.GetMilliSecond())
}
如何解决请求下游数量超限这个问题呢,有两种解法:第一种是在请求下游前,增加判断当前QPS。第二种是使用Redis 分布式锁setnx
限定QPS
先看第一种方案是在请求下游前,判断是否超过最大的QPS,如何获取QPS呢,QPS是在做性能测试时,我们常用的性能指标,指每秒的查询数量,用来衡量系统每秒处理的请求数量
那么要获取QPS,自然要获得当前的秒数,如果是在同一秒请求,我们用当前秒数作为Redis Key ,值初始为0,同一秒内每有一次请求,就把Redis 的值加1,这样就拿到了QPS(见代码当中的GetLimitRequest方法)
now := util.GetSecond()
nowMilli := util.GetMilliSecond()
res := GetLimitRequest(ctx, now)
// GetLimitRequest 获取当前qps
func GetLimitRequest(now int64) int {
key := fmt.Sprintf("limit_key_request_service_%v", now)
redisCache := redis.NewCache(ctx)
res, _ := redisCache.Incr(key)
if res > 0 {
go redisCache.Expire(key, 60)
}
return res
}
那QPS超出限额了怎么办,得计算到下1秒还有多少时间,要精确计算的话,我们只能可以获取比秒更小的单位-毫秒进行计算,分别获取下1s的时间(毫秒为单位),以及当前这1s的时间(同样毫秒为单位),两者相减,这样就知道到下1秒还差多少毫秒(对应下面代码的变量gap),让系统sleep gap对应毫秒数,这样就可以使得请求的维持在最大的QPS范围内
完整的代码片段如下
package main
import"time"
var (
CrontabTime = 20// 每20s执行一次脚本
ScriptMaxRunTime = 150// 脚本最长运行时间150s
)
func SetUserSize() {
var flag = false
// 获取锁
redisCache := redis.NewCache()
if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil {
ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err)
}
// flag = true说明已经有实例在请求了, 直接返回
// 否则 设置redis锁
if flag {
ctx.Notice("request_user_size_flag is true, return")
return
} else {
ctx.Notice("request_user_size_flag is false, request im to flush room user size")
if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil {
ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err)
}
}
ctx.NoticeF("run time start:%d", util.GetMilliSecond())
// 执行定时任务前,前置获取相关必要信息
info, err := GetInfo()
if err != nil {
ctx.WarningF("request info fail, error: %s", err)
// 获取信息失败,提前释放锁
_ = redisCache.Del(SetUserSizeRedisKey)
return
}
ctx.NoticeF("run time start req im:%d", util.GetMilliSecond())
// 定时任务具体逻辑,可忽略
for _, people := range info {
now := util.GetSecond()
nowMilli := util.GetMilliSecond()
res := GetLimitRequest(ctx, now)
if res > MaxQps {
gap := (now+1)*1000 - nowMilli
if gap > 0 {
time.Sleep(time.Duration(gap) * time.Millisecond) // 在sleep 期间 不再请求下游
}
} elseif res == 0 { //异常
time.Sleep(time.Duration(40) * time.Millisecond)
}
}
// 请求下游具体代码逻辑,可忽略
// ...
// ...
// 定时任务执行完毕,主动释放锁
_ = redisCache.Del(SetUserSizeRedisKey)
ctx.NoticeF("run time end:%d", util.GetMilliSecond())
}
/ GetLimitRequest 获取当前qps
func GetLimitRequest(now int64) int {
key := fmt.Sprintf("limit_key_request_service_%v", now)
redisCache := redis.NewCache(ctx)
res, _ := redisCache.Incr(key)
if res > 0 {
go redisCache.Expire(key, 60)
}
return res
}
使用Redis分布式锁
setnx是Redis的一个命令,它代表"Set if Not eXists"。这个命令尝试在Redis中设置一个键值对,但仅当指定的键不存在时才会成功。如果键已经存在,setnx操作将失败
我们可以使用setnx命令来创建Redis分布式锁,分布式锁是一种机制,用于确保在分布式系统中的多个节点或线程不会同时访问或修改共享资源,以避免竞态条件(race conditions)
分布式锁的主要目的是确保在分布式系统中,只有一个客户端(或线程)能够成功获得锁,以执行关键任务,而其他客户端必须等待
setnx的使用方式是,客户端通常会使用setnx命令尝试创建一个带有唯一标识的锁,然后在锁上设置一个过期时间,以防止锁被永久占用。当客户端不再需要锁时,可以使用del命令来释放锁
对于上面的并发问题,我们还可以使用SetNX来解决
func SetUserSize(ctx *gin.Context) {
ExpireTime:= int64(3)
client := redis.NewCache(ctx)
key := fmt.Sprintf("set_locker_%s", "param_ex")
res, err := client.SetNX(ctx, key, time.Now().Unix(), ExpireTime) //创建redis 分布式锁,ExpireTime过期时间为3秒
if err != nil {
ctx.WarningF("SetQuestionStatus get lock fail, err: %v", err)
errno.ErrRet(ctx, errno.ErrCallCacheFail)
return
}
if res != true {
errno.ErrRet(ctx, errno.ErrSetInfo)
return
}
defer client.Del(ctx, key) //执行完删除Redis分布式锁,让其他线程能正常获取锁,避免永久等待
//... 执行后续逻辑
}
用一张图片再来对比一下两种实现方案的区别,使用Redis分布式锁能帮助解决高并发下互斥任务的问题,但需要注意设置过期时间,避免永久锁住资源
感谢每一个认真阅读我文章的人!!!
作为一位过来人也是希望大家少走一些弯路,如果你不想再体验一次学习时找不到资料,没人解答问题,坚持几天便放弃的感受的话,在这里我给大家分享一些自动化测试的学习资源,希望能给你前进的路上带来帮助
?视频文档获取方式:
这份文档和视频资料,对于想从事【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!以上均可以分享,点下方小卡片即可自行领取。???