Redisson
是架设在Redis
基础上的一个Java
驻内存数据网格。在基于NIO
的Netty
框架上,充分的利用了Redis
键值数据库提供的一系列优势,在Java
实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
其中比较具体特色的就是 Redisson
对分布式锁的支持,不仅简化了分布式锁的应用过程还支持 Fair Lock、MultiLock、RedLock、ReadWriteLock
等锁的实现。本文就 Redisson
分布式锁的加锁和解锁过程的源码进行大致的解析。
下面是Redisson
源码地址:
如果对 Redisson
的使用还不了解的小伙伴可以先看下下面这篇文章:
Redisson
中的分布式锁在使用起来非常简便,例如:
public class TestLock {
@Resource
RedissonClient redissonClient;
@Test
public void test() {
RLock lock = null;
try {
// 获取可重入锁
lock = redissonClient.getLock("redislock");
// 获取锁,如果获取不到会等待
lock.lock();
Thread.sleep(30000000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock != null) {
// 释放锁
lock.unlock();
}
}
}
@Test
public void test1() {
RLock lock = null;
try {
// 获取可重入锁
lock = redissonClient.getLock("redislock");
// 尝试获取锁,返回获取锁的状态
Boolean isLock = lock.tryLock();
Thread.sleep(30000000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock != null) {
// 释放锁
lock.unlock();
}
}
}
}
下面分别从 lock
、tryLock
、unlock
、三个地方进行源码的解析。
先看下 redissonClient.getLock
方法,它默认创建了一个 RedissonLock
对象,并将锁的key
传递进来:
而 RedissonLock
对象又继承至RedissonBaseLock
类:
因此我们下面大多的源码分析都基于这两个类进行。
首先进到 RedissonLock
类下的 lock()
方法中:
这里主要又调用了 lock(long leaseTime, TimeUnit unit, boolean interruptibly)
方法,注意如果没有指定过期时间默认为 -1
,下面看到 lock(long leaseTime, TimeUnit unit, boolean interruptibly)
方法中:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 当前线程ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁,如果已经有锁的话返回锁的剩余时间
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 获取锁成功
if (ttl == null) {
return;
}
// 如果获取锁失败,订阅当前线程,以便后续获取锁时得到通知。
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
//设置超时处理,当订阅的future完成时,触发超时处理。
pubSub.timeout(future);
//定义一个RedissonLockEntry对象,用于表示当前线程在分布式锁中的状态。
RedissonLockEntry entry;
if (interruptibly) {
// 可中断
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
// 循环尝试获取锁
while (true) {
// 尝试获取锁
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 获取锁成功
if (ttl == null) {
break;
}
// 如果已经存在锁的过期时间大于等于0,需要等待通知
if (ttl >= 0) {
try {
// 通过Semaphore 的 tryAcquire方法等待指定时间
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else { //如果剩余时间小于0,就一直等待。
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 无论加锁成功或失败,都取消订阅
unsubscribe(entry, threadId);
}
}
代码中加了注释,这里我总结下,首先调用 tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId)
方法尝试获取锁,如果锁存在的话则返回过期时间,为 null
的话表示获取锁成功。如果获取锁失败,则将自己加入到订阅中,然后开启一个死循环,在循环中再次尝试获取锁,如果还是没有获取到的话则使用 Semaphore
的 tryAcquire
方法阻塞当前线程,如果其他线程释放了锁,则这里继续循环再次尝试获取锁。
下面主要看下 tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId)
尝试获取锁的逻辑,看到该方法下:
tryAcquire
方法又调用了 tryAcquireAsync0
方法,然后又主要调用了 tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
方法,下面主要看到这个方法下:
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
//如果指定了锁持有时间,则根据指定的时间设置 key 的过期时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 没指定,默认锁持有 30s
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 执行 lua 操作
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// 如果加锁成功
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {// 没指定的话
// 启动看门狗,延长锁持有时间
scheduleExpirationRenewal(threadId);
}
}
// 返回锁的过期时间
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
这里其中 tryLockInnerAsync
方法主要是指定了 Lua
脚本,主要注意的是如果没有指定了锁的过期则默认为 30s
的时间,然后在 Lua
脚本执行后,同样的判断,如果获取到锁的话并且没有指定锁的过期时间则开启看门狗机制,为锁延长时间续命的操作。
这里先看下核心操作 tryLockInnerAsync
方法中 Lua
脚本:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// lua 脚本
return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, command,
// 如果锁不存在,或者哈希表中锁对应的线程ID存在的话
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
// 对hash中的内容值 +1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//表示脚本执行成功,且不需要返回特定的值。
"return nil; " +
"end; " +
// 如果if条件不满足,返回剩余过期时间(以毫秒为单位)
"return redis.call('pttl', KEYS[1]);",
// 对应这 lua 脚本中的参数,第一个参数就是 KEYS[1],以此类推
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
这里主要利于 Lua
的原子性将整个判断操作过程给原子化了,其中这里锁的结构是以 hash
的形式存放的,key
为锁的名称,hash
中的key
为线程ID
(UUID
+线程ID
的形式),因为分布式情况下线程ID
也有可能重复,value
为数字表示锁重入的次数, lua
脚本如果执行加锁逻辑成功则返回 null
,否则返回锁的过期时间,也就对应前面获取锁的时候判断的依据。
下面回到上面的 tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
方法中,在 ttlRemainingFuture.thenApply
中如果获取锁成功,并且没有指定锁的过期时间则会开启看门狗机制为锁进行续命操作,主要调用的是 scheduleExpirationRenewal(long threadId)
方法,下面看到该方法下的逻辑:
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 加入看门狗记录中
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// 如果已经存在
if (oldEntry != null) {
// 重新指定线程ID
oldEntry.addThreadId(threadId);
} else { // 如果不存在的话就开启看门狗
entry.addThreadId(threadId);
try {
// 启动看门狗
renewExpiration();
} finally {
// 如果线程已经终止,则关闭看门狗
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
主要的逻辑在 renewExpiration()
方法下,继续看到该方法中:
private void renewExpiration() {
// 获取当前信息
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 执行计时任务
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//再次获取信息
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 获取线程ID
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 延长锁的过期时间
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) { //如果有异常删除该任务
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) { // 如果执行成功
// 递归继续执行
renewExpiration();
} else { // 执行失败
// 关闭看门狗
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
这里主要通过递归延时任务的方式实现循环执行的效果,其中延时的时间为 internalLockLeaseTime
的三分之一,也就是默认 10s
触发一次,在任务中主要通过 renewExpirationAsync(long threadId)
方法,对锁进行了延时续命操作,看到该方法中:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
// lua 脚本
return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果锁和线程ID存在
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 重置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 成功返回 1
"return 1; " +
"end; " +
// 失败返回 0
"return 0;",
// lua 脚本中对应的参数
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
这里还是依靠 Lua
脚本的方式,如果锁存在的话就重置过期时间,达到续命的效果。
tryLock
和lock
是两种获取分布式锁的方法,它们的主要区别在于获取锁的方式和阻塞行为。tryLock
默认是一种非阻塞的获取锁的方法,也可以通过设置 waitTime
变成阻塞的。而lock
默认就是一种阻塞的获取锁的方法。
他们俩的最终处理逻辑都是一样的,只不过默认的 tryLock
没有订阅阻塞的操作。
下面看下默认的 tryLock
的操作 ,进到 RedissonLock
下的 tryLock()
中:
再进入 tryLockAsync()
方法中:
这里调用了 tryLockAsync
方法,并将当前线程的ID
传递了进来,继续看到 tryLockAsync
方法中:
在看到 tryAcquireOnceAsync
方法中,注意这里的等待时间和上面 lock()
默认一样,是 -1
:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
CompletionStage<Boolean> acquiredFuture;
if (leaseTime > 0) {
//如果指定了锁持有时间,则根据指定的时间设置 key 的过期时间
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
// 没指定,默认锁持有 30s
acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// 执行 lua 操作
acquiredFuture = handleNoSync(threadId, acquiredFuture);
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
// 如果加锁成功
if (acquired) {
// 如果指定了锁持有时间
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else { // 没指定的话,
// 看门狗,延长锁持有时间
scheduleExpirationRenewal(threadId);
}
}
// 返回获取锁的状态
return acquired;
});
return new CompletableFutureWrapper<>(f);
}
这里的逻辑相比于前面 lock()
的逻辑就差不多了,只不过缺少了订阅和阻塞等待重试的操作,再下面的操作和lock()
的逻辑是一致的。
解锁的逻辑看到 RedissonBaseLock
下的 unlock()
方法中:
继续看到 unlockAsync
方法中:
主要逻辑在 unlockAsync0
方法中:
private RFuture<Void> unlockAsync0(long threadId) {
// 解锁
CompletionStage<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 关闭看门狗
cancelExpirationRenewal(threadId);
if (e != null) { // 如果执行有异常
if (e instanceof CompletionException) {
throw (CompletionException) e;
}
throw new CompletionException(e);
}
if (opStatus == null) { // 如果结果为空的话,表示锁不存在
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
主要做了两件事,解锁和关闭看门狗,先看下 unlockInnerAsync(long threadId)
方法解锁的过程:
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {
String id = getServiceManager().generateId();
MasterSlaveServersConfig config = getServiceManager().getConfig();
int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
timeout = Math.max(timeout, 1);
// 解锁
RFuture<Boolean> r = unlockInnerAsync(threadId, id, timeout);
CompletionStage<Boolean> ff = r.thenApply(v -> {
CommandAsyncExecutor ce = commandExecutor;
if (ce instanceof CommandBatchService) {
ce = new CommandBatchService(commandExecutor);
}
ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));
if (ce instanceof CommandBatchService) {
((CommandBatchService) ce).executeAsync();
}
// 释放锁的结果
return v;
});
return new CompletableFutureWrapper<>(ff);
}
这里的重点主要关注 unlockInnerAsync
方法,通过使用 Lua 脚本进行解锁的操作:
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
// lua 脚本
return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 从Redis中获取锁的状态。
"local val = redis.call('get', KEYS[3]); " +
//如果不是false
"if val ~= false then " +
//将其转换为数字并返回,也就是 true 返回 1
"return tonumber(val);" +
"end; " +
// 如果哈希表锁中不存在线程ID,表示锁已经被释放,返回nil。
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//对锁中的线程ID的值减1,并将结果存储在 counter 变量中。这是一个计数器的操作。
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果计数器值大于0,表示锁仍然被持有。
"if (counter > 0) then " +
// 更新哈希表锁的过期时间。
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
// 设置键锁的状态值为0,并设置过期时间,表示锁仍然被持有。
"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
//返回0,表示锁仍然被持有
"return 0; " +
"else " + //如果计数器值不大于0,表示锁即将被释放。
//删除锁
"redis.call('del', KEYS[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
// 设置键锁的状态值为1,并设置过期时间,表示锁已经被释放。
"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
//返回1,表示锁已经被释放
"return 1; " +
"end; ",
Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}
需要注意的是,在 Lua
脚本中,如果锁还存在的话,就对 hash
中的 value
减一,如果此时 value
结果还大于 0
的话,则表示这是重入锁的场景,此时不能直接删除锁,而是对重入的次数进行减一,并且要重置过期时间。
下面再回到 unlockAsync0(long threadId)
方法中,释放锁通过 Lua
脚本实现了,下面看下 cancelExpirationRenewal(Long threadId)
关闭看门狗的操作:
protected void cancelExpirationRenewal(Long threadId) {
// 从记录中获取信息
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
// 移除线程ID
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
// 关闭计时任务
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
// 从缓存记录中删除
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
这里就比较好理解了,停止计时任务,从缓存记录中移除。