这张尽量结合上一章进行使用:上一章
这章主要是讲如何通过redis
实现分布式锁的
这里我用redis
去实现:
技术:golang
,redis
,数据结构
这里是有一个大体的实现思路:主要是使用redis
中这些语法
redis
命令说明:
setnx
命令:set if not exists
,当且仅当key
不存在时,将key
的值设为value
。若给定的key
已经存在,则 SETNX不做任何动作。
- 返回1,说明该进程获得锁,将密钥的值设为值
- 返回0,说明其他进程已经获得了锁,进程不能进入临界区命令格式:设置锁。
get
命令:获取键的值
- 如果存在,则返回
- 如果不存在,则返回
nil
命令格式:获取锁getset
命令:该方法是原子的,对键设置newvalue
这个值,并且返回键原来的旧值。
- 命令格式:设置锁并设置键新值
del
命令:删除redis
中指定的key
- 命令格式:
del lock.key
看了很多博客,这里总结一些比较常用的一些方法:
原理:基于set命令的分布式锁
使用:set命令
存在问题:可能产生死锁
诺是想要更好的体验可以通过我的飞书观看:飞升思维导图
这里的一些出现的方法是java
中的。诺是需要可以改成自己的所属语言,这张图较为清晰我也就不做多余的说名,详情可以看我的飞书:飞书思维导图
const (
//解锁,使用lua变成原子性
unLockScript = "if redis.call('get',KEYS[1])==ARGV[1]" +
"then redis.call('del',KEYS[1]) " +
"return 1 " +
"else " +
"return 0 " +
"end"
//续期(看门狗)
watchLogScript = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end"
)
type DispersedLock struct {
key string //锁
value string //锁的值,随机值(可以用userId+requestId)
expire int //锁过期时间,单位毫秒
lockClient redis.Cmdable //启用锁的客户端,redis目前
unLockScript string //lua 脚本
watchLogScript string //看门狗 lua
unlockChan chan struct{} //通知通道
}
func (d DispersedLock) getScript(ctx context.Context, script string) string {
result, _ := d.lockClient.ScriptLoad(ctx, script).Result()
return result
}
var scriptMap sync.Map
func NewLockRedis(ctx context.Context, cmdable redis.Cmdable, key string, expire int, value string) *DispersedLock {
lock := &DispersedLock{
key: key,
value: value,
expire: expire,
}
lock.lockClient = cmdable
lockScrip, _ := scriptMap.LoadOrStore("dispersed_lock", lock.getScript(ctx, unLockScript))
lockWatch, _ := scriptMap.LoadOrStore("watch_log", lock.getScript(ctx, watchLogScript))
lock.unLockScript = lockScrip.(string)
lock.watchLogScript = lockWatch.(string)
lock.unlockChan = make(chan struct{}, 0)
return lock
}
func (d DispersedLock) Lock(ctx context.Context) bool {
ok, _ := d.lockClient.SetNX(ctx, d.key, d.value, time.Duration(d.expire)*time.Millisecond).Result()
if ok {
go d.watchDog(ctx)
}
return ok
}
func (d DispersedLock) watchDog(ctx context.Context) {
//创建一个定时器,每到工作时间的2/3就出发一次
duration := time.Duration(d.expire*1e3*2/3) * time.Millisecond
ticker := time.NewTicker(duration)
//打包成原子
for {
select {
case <-ticker.C:
//脚本参数
args := []interface{}{
d.value,
d.expire,
}
result, err := d.lockClient.Eval(ctx, d.watchLogScript, []string{d.key}, args...).Result()
if err != nil {
logS.LogM.ErrorF(ctx, "watchDog error %s", err)
return
}
res, ok := result.(int64)
if !ok {
return
}
if res == 0 {
return
}
case <-d.unlockChan:
return
}
}
}
func (d DispersedLock) unlock(ctx context.Context) bool {
//脚本参数
args := []interface{}{
d.value,
}
result, _ := d.lockClient.Eval(ctx, d.unLockScript, []string{d.key}, args...).Result()
close(d.unlockChan)
if result.(int64) > 0 {
return true
} else {
return false
}
}
const lockMaxLoopNum = 1000
// LoopLock 轮询等待
func (d DispersedLock) LoopLock(ctx context.Context, sleepTime int) bool {
cancel, cannel := context.WithCancel(context.Background())
ticker := time.NewTicker(time.Duration(sleepTime) * time.Millisecond)
count := 0
status := 0
loop:
for {
select {
case <-cancel.Done():
break loop
default:
}
if d.Lock(ctx) {
ticker.Stop()
cannel()
break
} else {
<-ticker.C
}
count++
//判断是否大于最大获取次数,达到最大直接退出循环
if count >= lockMaxLoopNum {
status = 1
break
}
}
cannel()
if status != 0 {
return false
}
return true
}
这些就是通过redis去实现一个分布式锁的具体步骤,很多实现,估计很多其他语言的朋友们可能会有些蒙圈。但是没有关系。go 关键字
你就当他是一个线程就可以了,select 关键字
,你可以理解成队列+if
的判断
golang
:redsync
import "github.com/go-redsync/redsync/v4"
这个包基本上满足了市面上分布式锁的所有需求,包括续租:(但是这里的续租需要一定的条件才能触发,这个条件要达到redis实例的最大值时才能触发)。所以为了,方便使用,建议可以自己续写一个续租的方法。
这里献上我的:
// NewLock 实例化一个分布式锁,用来实现幂等,降低重试成本
func NewLock(mutexName string) *redsync.Mutex {
pool := goredis.NewPool(configuration.RedisClient)
rs := redsync.New(pool)
newString := uuid.NewString()
lockName := "Lock:" + newString + ":" + mutexName
mutex := rs.NewMutex(lockName)
return mutex
}
// LockRelet 周期性续租,过去无可挽回,未来可以改变
// num定义时间:单位毫秒
// size定义续租的次数
func LockRelet(num int, size int, mutex *redsync.Mutex) chan bool {
done := make(chan bool)
if size <= 0 {
return nil
}
go func() {
ticker := time.NewTicker(time.Duration(num) * time.Millisecond)
defer ticker.Stop()
for size > 0 {
size--
select {
case <-ticker.C:
extend, err := mutex.Extend()
if err != nil {
logS.LogM.Panicf("Failed to extend lock:", err)
} else if !extend {
logS.LogM.Panicf("Failed to extend lock: not successes")
}
case <-done:
return
}
}
}()
return done
}