前提:大部分服务模块都是用的一个redis实例
redsync库:https://github.com/go-redsync/redsync
加解锁使用的是通用的做法(如下)
加锁:setnx、value为锁持有者的唯一标识符、设置过期时间
// redsync/v4/mutex.go
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}
解锁:lua脚本(保证原子操作),只有删除锁为锁的持有者才能允许删除
// redsync/v4/mutex.go
var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}
项目中封装了下newMutex,就是加上了服务模块名作为锁名称的前缀
func (s *RedisSync) NewMutex(path string, expiration time.Duration) Mutex {
path = fmt.Sprintf("%s/%s", s.prefix, strings.TrimPrefix(path, "/"))
return &RedisMutex{
mutex: s.redsync.NewMutex(path, redsync.WithExpiry(expiration)),
}
}
func (s *RedisSync) NewMutexWithExpirationAndTry(path string, expiration time.Duration, try int) Mutex {
path = fmt.Sprintf("%s/%s", s.prefix, strings.TrimPrefix(path, "/"))
return &RedisMutex{
mutex: s.redsync.NewMutex(path, redsync.WithExpiry(expiration), redsync.WithTries(try)),
}
}
本项目中,分布式锁可以用来在代码层面锁住用户正在操作的一个功能(比如:编辑订单)。本来在数据库层已经通过事务保证了一致性了,我们在代码层再做的话,就是,防止出现用户a和用户b同时操作一个功能(修改数据)后,用户a发现他修改完数据、保存后,数据和自己设置的不一样。
背景:在编辑订单的时候,可能存在一个企业账号n,多个人正在使用这个账号去编辑同一条订单
我们使用了两个锁:外层分布式锁、内层锁(说是个锁,其实就是用redis的k-v来表示两次获取外层锁的是不是同一个人)
//加锁
mutex := impl.Sync.NewMutexWithExpirationAndTry(fmt.Sprintf("ORDER_ORDER_UPDATE_LOCK_%v_%v", userInfo.GroupId, request.SerialNo), 5*time.Second, 2) //过期时间 5 秒
err = mutex.Lock()// 这里已经阻止了部分人了
// 。。。
defer func(mutex grpcsync.Mutex) {
err = mutex.Unlock()
// 。。。
}(mutex)
// 去拿k-v锁
key := fmt.Sprintf("GM_ORDER_ORDER_UPDATE_%v_%v", userInfo.GroupId, request.SerialNo)
result, err := impl.Redis.Exists(ctx, key).Result()
if err != nil {
grpclog.Errorf("EditOrderLock,check exists key error,serialNo=%v,err=%v", request.SerialNo, err)
return nil, err
}
//
var lockKey string
nowTime := time2.UnixMilliNow()
//
if result > 0 { //锁已经存在,已经有人进入编辑状态了
if len(request.LockKey) <= 0 { //其他人正在编辑,想进入编辑状态,其他人正在编辑订单,无法进入编辑状态
return nil, errors.GRPCError(ordermodel.Status_CODE_ORDER_EDITING_ERROR)
}
lockKey, err = impl.Redis.Get(ctx, key).Result()
if err != nil {
grpclog.Errorf("EditOrderLock,get redis value,serialNo=%v,err=%v", request.SerialNo, err)
return nil, err
}
if strings.Compare(request.LockKey, lockKey) != 0 { //曾经进入过编辑状态,但是锁续期失败或者没有续期,被其他人抢到了锁
return nil, errors.GRPCError(ordermodel.Status_CODE_ORDER_EDITING_ERROR)
}
} else { //不存在,生成lockKey
lockKey = crypto.Md5String(fmt.Sprintf("%v_%v", key, nowTime))
}
//锁续期
err = impl.Redis.Set(ctx, key, lockKey, 5*time.Second).Err()
if err != nil {
grpclog.Errorf("EditOrderLock,set redis value,serialNo=%v,err=%v", request.SerialNo, err)
return nil, err
}
每个服务模块在启动时,会创建一个与redis的连接:
// 使用go-redis库创建redis cli之后,使用了redsync库去创建连接池
func NewRedisSync(client *redis.Client, prefix string) Sync {
pool := goredis.NewPool(client)// goredis是redsync库中的
return &RedisSync{
prefix: prefix,
redsync: redsync.New(pool),// 实现分布式系统中的互斥锁
// func New(pools ...Pool) *Redsync
// 说明支持多redis节点
}
}