Redisson分布式锁原理分析

发布时间:2023年12月18日

1.Redisson实现分布式锁

在分布式系统中,涉及到多个实例对同一资源加锁的情况,传统的synchronized、ReentrantLock等单进程加锁的API就不再适用,此时就需要使用分布式锁来保证多服务之间加锁的安全性。
常见的分布式锁的实现方式有:zookeeper、Redis。Redis分布式锁相对简单,Redis分布式锁常用于业务场景中,Redisson是Redis实现分布式锁常用方式

1.1.Redisson锁使用

Redis分布式锁中,setnx命令,可保证一个key同时只能有一个线程设置成功,这样可实现加锁的互斥性。但是Redisson并未通过setnx命令实现加锁。

  1. 引入依赖
 <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.15.6</version>
</dependency>
  1. 配置类
package com.hong.springbootjwt.config.redission;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.TransportMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: hong
 * @date: 2023/2/16 20:09
 * @description RedissonClient
 */
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.setTransportMode(TransportMode.NIO);
        SingleServerConfig singleServerConfig = config.useSingleServer();
        //可以用"rediss://"来启用SSL连接
        singleServerConfig.setAddress("redis://127.0.0.1:6379");
        singleServerConfig.setPassword("123456");
        return Redisson.create(config);
    }
}

Redisson加锁使用:

package com.hong.springbootjwt.service.impl;

import com.hong.springbootjwt.service.UserService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;

/**
 * @author: hong
 * @date: 2023/2/16 20:08
 * @description
 */
@Service
public class UserServiceImpl implements UserService {
    private final RedissonClient redissonClient;

    public UserServiceImpl(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }
    
    @Override
    public void redissionLock() {
        // 获取锁对象
        RLock myLock = redissonClient.getLock("myLock");
        try {
            // 加锁
            myLock.lock();
            ...
        }catch (Exception e) {
            
        }finally {
            // 释放锁
            myLock.unlock();
        }
    }
}

1.1.1.Redisson加锁原理

  1. 通过RedissonClient,传入锁的名称,获取到RLock(获得RLock接口的实现是RedissonLock),然后通过RLock实现加锁和释放锁
public RLock getLock(String name) {
    return new RedissonLock(this.commandExecutor, name);
}
  1. RedissonLock对lock()方法的实现
public void lock() {
    try {
        this.lock(-1L, (TimeUnit)null, false);
    } catch (InterruptedException var2) {
        throw new IllegalStateException();
    }
}
  1. 重载lock方法,传入leaseTime为-1,之后调用tryAcquire实现加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
    ...
}
  1. tryAcquire最后调用到tryAcquireAsync方法,传入leaseTime和当前加锁线程ID,tryAcquire和tryAcquireAsync的区别在于,tryAcquireAsync是异步执行,而tryAcquire是同步等待tryAcquireAsync的结果,也即异步转同步的过程
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture ttlRemainingFuture;
    if (leaseTime != -1L) {
        ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ...
}
  1. tryAcquireAsync方法会根据leaseTime是否为-1,判断使用哪个分支加锁,不论走哪个分支,最后都调用tryLockInnerAsync方法实现加锁,只是参数不同。此处leaseTime=-1,走下面分支
    虽然传入tryAcquireAsync的leaseTime是-1,但在调用tryLockInnerAsync方法传入的leaseTime参数是this.internalLockLeaseTime,也即默认的30s
  2. 进入到tryLockInnerAsync方法,最终加锁是通过一段LUA脚本来实现的,Redis在执行LUA脚本时,可保证加锁的原子性,所以Redisson实现加锁的原子性是依赖LUA脚本实现的。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil;" + 
                    " end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); return nil;" +
                    " end;" +
                    " return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
  1. 最后,对于RedissonLock这个实现类来说,最终实现加锁的逻辑都是通过tryLockInnerAsync来实现

1.1.2.LUA脚本实现加锁

Redis通过执行LUA脚本实现加锁,保证加锁的原子性,分析上述LUA脚本

  • KEY[1]:加锁的名称,此处demo,就是myLock
  • ARGV[1]:锁的过期时间,不指定的话默认是30s
  • ARGV[2]:代表加锁的唯一标识,由UUID和线程ID组成,一个Redisson客户端一个UUID(代表唯一的客户端),所以由UUID和线程ID组成加锁的唯一标识,可理解为某个客户端的某个线程加锁

这些参数是如何传过去的呢?其实就是在这里

  • getName:获取锁的名称
  • leaseTime:传入的锁的过期时间,没指定默认是30s
  • getLockName:就是获取加锁的客户端线程的唯一标识。

这段LUA的加锁逻辑:

  1. 调用Redis的exists命令,判断加锁的key是否存在,如果不存在,进入if。(不存在的话,就是没有某个客户端的线程来加锁,第一次加锁肯定没有加锁)于是第一次if条件成立
  2. 接着调用Redis的hincrby命令,设置加锁的key和加锁的某个客户端的某线程,加锁次数设置为1。(加锁次数很重要,是实现可重入锁特性的关键数据),用hash数据结构保存。hincrby命令完成后形成如下数据结构:
myLock:{
	"b983c153-7421-469a-addb-44fb92259a1b:1":1
}
  1. 最后,调用Redis的pexpire命令,将锁的过期时间设置为30s

总结:第一个客户端线程加锁的逻辑还是挺简单的,就是判断有无加过锁,没有的话自己去加锁,设置加锁的key,保存加锁的线程和加锁次数,设置锁的过期时间为30s

问题:为何要设置加锁key的过期时间?
主要原因是为了防止死锁,当某客户端获取到锁,还未来得及释放锁,当客户端宕机了,或者释放失败了,一旦没设置过期时间,那么这个锁key会一直存在,当其他线程来加锁的话发生key已经被加锁了,那么其他线程会一直加锁失败,从而造成死锁问题。

1.2.延长加锁时间

在加锁过程中,没指定锁的过期时间,Redisson也会默认给锁设置30s的过期时间,来防止死锁
虽然设置默认过期时间可防止死锁,但若在30s内,任务还未结束,但是锁已经释放失效了,一旦其他线程加锁成功,就可能出现线程安全,数据错乱问题。所以Redisson针对这种未指定超时时间的加锁,实现了一个watchdog机制,即“看门狗机制”自动延长加锁时间
在客户端通过tryLockInnerAsync方法加锁后,如果没指定锁过期时间,那么客户端会起一个定时任务,来定时延长加锁时间,默认10s执行一次,所以watchdog的本质就是一个定时任务

最后定期执行一段LUA脚本,实现加锁时间的延长

脚本中参数解释(同加锁的参数):

  • KEYS[1]:锁的名称,此demo为“myLock”
  • ARGV[1]:就是锁的过期时间
  • ARGV[2]:代表了加锁的唯一标识,b983c153-7421-469a-addb-44fb92259a1b:1

这段LUA脚本意思是判断续约的线程和加锁的线程是否为同一个,若为同一个,将锁的过期时间延长30s,然后返回1,代表续约成功,不是的话就返回0,续约失败,下一次定时任务就不会执行

注意:因为有了看门狗机制,所以若没有设置过期时间,并且没有主动释放锁,那么这个锁就永远不会释放,因为定时任务会不断延长锁的过期时间,造成死锁问题。
但是如果发生宕机,是不会造成死锁的,因为宕机了,服务也就没有了,那么看门狗的定时任务就没了,自然不会续约,等锁自动过期了,也就自动释放锁了。

1.3.实现可重入锁

可重入锁的意思就是,同一个客户端同一个线程多次对同一个锁进行加锁。
在Redisson中,可以执行多次lock方法,流程都是一样的,最后调用到LUA脚本,所以可重入锁的逻辑也是通过加锁的LUA脚本实现
下半部分就是可重入锁的逻辑

下面这段if的意思是:判断当前已经加锁的key对应的加锁线程,跟要加锁的线程是否为同一个,如果是,则将该线程对应的加锁次数加1,也即实现了可重入加锁,同时返回nil
可重入锁加锁成功后,加锁key和对应值可能是这样:

myLock:{
	"b983c153-7421-469a-addb-44fb92259a1b:1":2
}

1.4.主动释放锁

当业务执行完毕后,需要主动释放锁,为什么需要主动释放锁呢?

  1. 当任务执行完,未手动释放锁,如果没有指定锁的超时时间,那么因为看门狗机制,会导致这个锁无法释放,可能造成死锁问题
  2. 如果指定了超时时间,虽然不会造成死锁问题,但会造成资源浪费。假设设置超时时间为30s,但任务只执行了2s就完成,那么这个锁会还会被占用28s,这28s内其他线程就无法成功加锁。

Redisson如何主动释放锁以及避免其他线程释放自己加的锁呢?
主动释放锁是通过unlock方法实现,分析unlock方法的实现:

  1. unlock调用unlockAsync()方法,传入当前释放线程的ID,代表当前线程来释放锁(unlock其实也是将unlockAsync的异步操作转为同步操作)
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise();
    RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        this.cancelExpirationRenewal(threadId);
        if (e != null) {
            result.tryFailure(e);
        } else if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
            result.tryFailure(cause);
        } else {
            result.trySuccess((Object)null);
        }
    });
    return result;
}
  1. unlockAsync最后会调用RedissonLock的unlockInnerAsync()实现释放锁的逻辑(也是执行一段LUA脚本)
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]);" +
                    "return 0;" +
                    "else redis.call('del', KEYS[1]);" +
                    "redis.call('publish', KEYS[2], ARGV[1]);" +
                    "return 1; " +
                    "end; " +
                    "return nil;", 
            Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}

LUA脚本解释:

  1. 先判断来释放锁的线程是不是加锁的线程,如果不是,直接返回nil;可看出,此处是通过一个if条件防止线程释放了其他线程加的锁
  2. 如果释放锁的线程是加锁的线程,那么将锁次数减1,然后拿到加锁次数counter变量
  3. 若counter大于0,说明有重入锁,锁还未完全释放完,那么设置一下过期时间,然后返回0
  4. 若counter未大于0,说明此锁已经释放完成,将锁对应的key删除,然后发布一个锁已经释放的消息,然后返回1

1.5.超时自动释放锁

已知如果不指定超时时间的话,存在看门狗线程不断延长加锁时间,不会导致锁超时释放,自动过期,那么指定超时时间的话,是如何实现指定时间释放的呢?
能够设置超时时间的方法:

// 通过传入leaseTime参数可指定锁超时时间
void lock(long leaseTime, TimeUnit unit)

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)

已知,无论是否设置锁超时时间,最终都会调用tryAcquireAsync方法进行加锁。只是不指定超时时间的话,传入的leaseTime值是-1,也即不指定超时时间,但是Redisson默认还是会设置30s过期时间;当指定超时时间,那么leaseTime就是自己指定的时间,最终也是通过一个LUA脚本进行加锁逻辑
是否指定超时时间的区别:

  • 不指定超时时间,会开启watchdog后台线程,不断续约加锁时间
  • 指定超时时间,就不会开启watchdog定时任务,这样就不会续约,加锁key到了过期时间就会自动删除,即达到释放锁的目的


总结:
指定超时时间,达到超时释放锁的功能主要通过Redis自动过期来实现,因为指定了超时时间,加锁成功后就不会开启watchdog机制来延长加锁时间
实际项目中,若能比较准确预估代码执行时间,那么可以指定锁超时释放时间,来防止业务执行错误导致无法释放锁的问题;若不能预估代码执行时间,那么可以不指定超时时间,在finally代码块中采用unlock手动释放。

1.6.实现不同线程加锁的互斥

前面已经分析过,第一次加锁逻辑和可重入锁的逻辑,因为LUA脚本加锁的逻辑同时只有一个线程能够执行(Redis是单线程的原因),所以一旦有线程加锁成功,那么另一线程来加锁,前面两个if条件都不成立,最后通过调用Redis的pttl命令返回锁的剩余过期时间回去。
这样,客户端就可根据返回值判断是否加锁成功, 因为第一次加锁和可重入锁的返回值都是nil,而加锁失败就返回了锁的剩余过期时间

// 第一次加锁
if (redis.call('exists', KEYS[1]) == 0) then 
  redis.call('hincrby', KEYS[1], ARGV[2], 1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;
// 可重入锁
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  redis.call('hincrby', KEYS[1], ARGV[2], 1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;
// 无法加锁成功
return redis.call('pttl', KEYS[1]);

所以加锁的LUA脚本通过条件判断就可实现加锁的互斥操作,保证其他线程无法加锁成功。

总结:加锁的LUA脚本实现了第一次加锁、可重入锁、加锁互斥的逻辑

1.7.加锁失败如何实现阻塞等待加锁

上面分析已知,加锁失败后,会走如下代码:

这里可看出,最终会执行死循环(自旋)的方式,不停的通过tryAcquire()方法来实现加锁,直到加锁成功后才会跳出死循环,如果一直没有加锁成功,那么就会一直旋转下去,所谓阻塞,就是自旋加锁的方式
但是这种阻塞可能产生问题,如果其他线程释放锁失败,那么这个阻塞加锁的线程会一直阻塞加锁,肯定会出问题的,所以需要设置超过一定时间还未加锁成功的话,就放弃加锁。

超时放弃加锁的方法:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
boolean tryLock(long time, TimeUnit unit)

通过waitTime参数或time参数来指定超时时间,这两个方法的主要区别在于是否支持指定锁超时时间

do {
    long currentTime = System.currentTimeMillis();
    ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
        var16 = true;
        return var16;
    }
    // 超过尝试时间,加锁失败,返回false
    time -= System.currentTimeMillis() - currentTime;
    if (time <= 0L) {
        this.acquireFailed(waitTime, unit, threadId);
        var16 = false;
        return var16;
    }

    currentTime = System.currentTimeMillis();
    if (ttl >= 0L && ttl < time) {
        ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
        ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);

从源码可看出,实现一定时间内还未获取到锁,就放弃加锁的逻辑,其实相比于一直自旋获取锁,主要是加了超时判断,如果超时了,就退出循环,放弃加锁

1.8.实现公平锁

什么是公平锁?
公平锁就是指线程成功加锁的顺序,跟线程请求加锁的顺序一样,实现了先来先成功加锁的特点,不插队才叫公平
前面所说的RedissonLock的实现都是非公平锁,但里面有些机制如watchdog机制是公平的
公平锁和非公平锁比较
公平锁

  • 优点:按顺序平均分配锁资源,不会出现线程饿死(即某一线程长时间未获得锁)的情况
  • 缺点:按顺序唤醒线程的开销大,执行性能不高

非公平锁:

  • 优点:执行效率高,谁先获得锁,谁就先执行,无需按顺序唤醒
  • 缺点:资源分配随机性强,可能出现线程饿死的情况

如何使用公平锁
通过RedissonClient的getFairLock可获取到公平锁。Redisson对于公平锁的实现是RedissonFairLock类,通过RedissonFairLock来加锁,可实现公平锁的特性

public void redissionLock() {
    // 获取锁对象
    RLock myLock = redissonClient.getFairLock("myLock");
    try {
        // 加锁
        myLock.lock(30,TimeUnit.SECONDS);

    } catch (Exception e) {

    } finally {
        // 释放锁
        myLock.unlock();
    }
}

RedissonFairLock继承了RedissonLock,主要重写了tryLockInnerAsync方法,也就是加锁逻辑的方法。

概述这段LUA的作用:

  1. 当线程来加锁的时候,如果加锁失败,将线程放置到一个set中,这样就按照加锁顺序给线程排队,set集合的头部的线程就代表接下来要加锁成功的线程
  2. 当有线程释放锁之后,其他加锁失败的线程就会继续来实现加锁
  3. 加锁前判断一下set集合的头部线程跟当前要加锁的线程是否同一个
  4. 如果是同一个,那么加锁成功
  5. 如果不是的话,就加锁失败,这样就实现了加锁的顺序性

1.9.实现读写锁

在实际开发中,会有很多“读多写少”的场景,对于这种场景,使用独占锁加锁,在高并发情况下,会导致大量线程加锁失败,阻塞,对系统吞吐量有一定影响,为了适配这种“读多写少”的场景,Redisson也实现了读写锁的功能
读写锁的特点:

  • 读与读是共享的,不互斥
  • 读与写互斥
  • 写写互斥

Redisson中使用读写锁:

public void redissionLock() {
    // 获取读写锁
    RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
    RLock readLock = readWriteLock.readLock();
    try {
        readLock.lock();
        // 业务操作
        ...
    } catch (Exception e) {

    } finally {
        // 释放锁
        readLock.unlock();
    }
    RLock writeLock = readWriteLock.writeLock();
    try {
        writeLock.lock();
        // 业务操作
        ...
    } catch (Exception e) {

    } finally {
        // 释放锁
        writeLock.unlock();
    }
}

Redisson通过RedissonReadWriteLock类实现读写锁功能。通过这个类可以获取到读锁和写锁,所以真正的加锁逻辑是由读锁和写锁实现的
Redisson是如何具体实现读写锁的?
前面已知,加锁成功后会在Redis中维护一个hash的数据结构,存储加锁线程和加锁次数。在读写锁的实现中,会往hash数据结构中多维护一个mode字段,来表示当前加锁的模式。
所以能够实现读写锁,最主要是因为维护了一个加锁模式的字段mode,这样当线程来加锁的时候,就能根据当前加锁模式结合读写的特性来判断要不要让当前线程加锁成功

  • 若没有加锁,那么不论读锁还是写锁都能加锁成功,成功后根据加锁类型维护mode字段
  • 若模式是读锁,加锁线程也是加读锁的,就让它加锁成功
  • 若模式是读锁,加锁线程是加写锁的,就让它加锁失败
  • 若模式是写锁,不论线程是加写锁还是读锁,都让它加锁失败(加锁线程自己除外,可重入特性)

1.10.实现批量加锁(联锁)

批量加锁的意思是同时加几个锁,只有这些锁都加成功了,才算真正的加锁成功!
比如:一个下单业务中,同时需要锁定订单、库存、商品,基于这种需要锁多种资源的场景中,Redisson提供了批量加锁的实现,对应的实现类是RedissonMultiLock
使用联锁:

public void redissionLock() {
    // 获取读写锁
    RLock myLock1 = redissonClient.getLock("myLock1");
    RLock myLock2 = redissonClient.getLock("myLock2");
    RLock myLock3 = redissonClient.getLock("myLock3");
    RLock multiLock = redissonClient.getMultiLock(myLock1,myLock2,myLock3);
    try {
        multiLock.lock();
        // 业务操作
        ...
    } catch (Exception e) {

    } finally {
        // 释放锁
        multiLock.unlock();
    }
}

Redisson对于批量加锁的实现也很简单,源码如下:

就是根据顺序依次调用tryLock,传入myLock1,myLock2,myLock3加锁方法,如果都成功加锁了,那么multiLock就算加锁成功

1.11.RedLock算法

对于单Redis实例来说,如果Redis宕机了,那么整个系统就无法运行,所以为了保证Redis的高可用,一般都会采用主从或哨兵模式,但是一旦使用了主从或哨兵模式,此时Redis的分布式锁就可能出现问题
例如,使用哨兵模式

基于这种模式,Redis客户端会在master节点上加锁,然后异步复制到slave节点上。但是一旦master节点宕机,那么哨兵感知到,就会从slave节点选择一个节点作为主节点。
假设客户端对原主节点加锁,加锁成功后还未来得及同步到从节点,主节点宕机了,从节点变为了主节点,此时从节点是没有加锁信息的,如果其他客户端来加锁,是能够加锁成功的!!
针对此问题,Redis官方提供一种RedLock算法,Redisson刚好实现了这种算法

RedLock算法
在Redis分布式环境中,假设有N个master节点,这些节点相互独立,不存在主从复制或其他集群协调机制。
前面描述过,在Redis单例下怎么安全获取和释放锁,需要确保将在N个实例上使用此方法获取和释放锁。为了获取锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以ms为单位
  2. 依次尝试从N个实例,使用相同key和随机值获取锁,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间小于锁的失效时间。这样可避免服务器端Redis已经挂掉情况下,客户端还在等待响应结果,如果服务器端没有在规定时间内响应,客户端应尽快尝试其他Redis实例
  3. 客户端使用当前时间减去开始获取锁的时间(步骤1记录的时间),就得到获取锁使用的时间,并且仅当从大多数(3个节点,共5个)的Redis节点中获取到锁,并且使用时间小于锁失效时间时,锁才算获取成功
  4. 如果获取到锁,key的真正有效时间等于有效时间减去获取锁所使用时间(步骤3计算所得结果)
  5. 若因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例上获取到锁,或取锁时间已经超过有效时间),客户端应该在所有Redis实例上进行解锁(即使某些Redis实例根本没有加锁成功)

Redisson对RedLock算法的实现:


RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
 
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

RedissonRedLock加锁过程如下:

  1. 获取所有Redisson Node节点信息,循环向所有Node节点加锁,假设节点数为N,一个Redisson Node代表一个主从节点
  2. 若在N个节点中,有 N/2 + 1个节点加锁成功,那么整个RedissonRedLock加锁成功
  3. 若在N个节点中,小于N/2 + 1个节点加锁成功,那么整个RedissonRedLock加锁失败
  4. 若中途发现各节点加锁总耗时,大于等于设置的最大等待时间,则直接返回失败

RedissonRedLock底层其实也是基于RedissonMultiLock实现的,RedissonMultiLock要求所有的加锁成功才算成功,RedissonRedLock要求只要有N/2+1个成功就算成功

文章来源:https://blog.csdn.net/Yu_Mariam/article/details/135005477
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。