在分布式系统中,确保多个节点间协作执行时的数据一致性是至关重要的。分布式锁作为一种常见的技术手段,能够帮助实现多个节点对共享资源的安全访问。Redisson作为一个基于Redis的Java驱动库,提供了简单且强大的分布式锁功能。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.14.1</version>
</dependency>
@Configuration
public class RedissonConfig {
// 配置redis信息
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.database}")
private int database;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
// Redisson客户端注册到Spring管理
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://"+host+":"+port).setDatabase(database);
if (StringUtils.isNotBlank(password)){
config.useSingleServer().setPassword(password);
}
config.setCodec(new StringCodec());
return Redisson.create(config);
}
}
使用非常的简单
public String redissonTest() {
// 业务key
String key = "myKey";
RLock lock = redissonClient.getLock(key);
lock.lock();
try {
// 执行业务逻辑
...
} finally {
lock.unlock();
}
return "done";
}
从上述可以发现,redisson的使用非常的简单,现在让我们来探究它的底层实现原理.
// 大致逻辑,如下图
1.尝试获取锁
2.若获取到锁,则去执行业务逻辑,最终释放锁unlock.
3.若没有获取到锁,则首先会订阅锁状态变更的消息.
4.接着遍历
5.首先会再次尝试获取锁,若得到锁,走2的逻辑.
6.若还是没有得到锁,此时会有锁剩余过期时间,然后对该线程进行阻塞(阻塞时间为ttl)
7.但是在阻塞期间,锁可能提前释放了,所以释放锁的逻辑中会发布一个消息,提醒订阅的线程可以去再次争抢锁,不用等到ttl过去.
// redisson客户端加锁
lock.lock();
// ->
lock(-1, null, false);
// ->
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程的ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁,并获取锁的剩余有效时间
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 如果成功获取锁,则直接返回,否则进行下一步操作
if (ttl == null) {
return;
}
// 订阅锁的状态变更消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 根据是否支持可中断的方式,同步处理订阅操作
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
// 循环尝试获取锁
while (true) {
// 再次尝试获取锁,并获取锁的剩余有效时间
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 如果成功获取锁,则跳出循环,否则继续等待
if (ttl == null) {
break;
}
// 根据锁的剩余有效时间,进行等待
if (ttl >= 0) {
try {
// 尝试等待锁状态变更消息
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// 如果可中断,并发生中断异常,根据情况抛出或继续等待
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// 锁超时等待,根据是否支持可中断的方式进行等待
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消订阅锁的状态变更消息
unsubscribe(future, threadId);
}
// 异步获取锁,该行代码可能是注释掉的备用代码
// get(lockAsync(leaseTime, unit));
}
现在我们来看redisson是如何加锁的,实际是利用了redis的lua脚本实现的
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
// 现在来解释这段lua脚本
// KEYS[1] = getName()就是我们的业务key;
// ARGV[1] = internalLockLeaseTime;锁过期时间,默认30s
// ARGV[2] = getLockName(threadId); uuid + 线程id
// 第一段:如果锁不存在,则创建一个新的锁,并设置过期时间,返回空,表示获取锁成功
redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; "
// 第二段:如果锁存在,并且是当前线程持有则对锁进行更新(+1,这时可重入锁的实现),同时延长过期时间,返回空,表示获取锁成功
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil;
// 第三段:锁存在,但不是当先线程持有,返回锁的剩余有效时间
return redis.call('pttl', KEYS[1]);,
这里使用lua脚本,并且redis操作数据是单线程的,所以保证了原子性.
如果持有锁的线程执行业务时间非常久,超过了过期时间,这样久有问题了,所以redisson还为我们提供了自动延时的功能。
当线程加锁成功后,会异步起一个线程延长锁时间.
// 我们只看关键部分
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
...
// 尝试获取锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 当获取锁的异步操作完成后的回调处理
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 如果出现异常,则直接返回,不执行后续操作
if (e != null) {
return;
}
// 如果成功获取到锁,执行锁自动续期的调度
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture; // 返回获取锁的异步操作
}
// 成功获取锁后则ttlRemaining=null,则会进入
// -> scheduleExpirationRenewal(threadId);
// -> renewExpiration();
private void renewExpiration() {
...
// 创建一个定时任务,在一定时间后执行续期操作
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
...
// 异步执行锁的过期续期操作
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
...
if (res) {
// 如果续期成功,则再次调度续期操作
// res=true,代表锁还在,递归调用自己
// 这个方法是延时调用的,默认30/3,10秒调用一次
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 设置定时任务的执行时间
// 将定时任务设置到过期续期信息中
ee.setTimeout(task);
}
// -> 这里又是个lua脚本
RFuture<Boolean> future = renewExpirationAsync(threadId);
// -> 如果锁还在就延长锁的过期时间并返回true,锁不在了返回false
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
持有锁的线程执行完业务逻辑后需要释放锁.
// 释放锁逻辑
RFuture<Boolean> future = unlockInnerAsync(threadId);
// -> 也是使用了lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果锁不存在或者不属于当前线程,则返回空,表示无法解锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 锁存在,尝试将锁的计数器减一
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果计数器仍大于0,则说明锁未完全释放,设置新的过期时间并返回0
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 如果计数器减为0,说明锁已完全释放,删除锁并发布解锁消息,返回1
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
// 其他情况返回空
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
如果锁释放了,会发布一个事件,之前获取锁失败的线程都会订阅这个消息,然后会重新竞争锁.
上一步知道锁释放的消息为:LockPubSub.UNLOCK_MESSAGE
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 这里释放了阻塞的线程
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
Redisson提供了强大的分布式锁功能,强烈推荐!!!
上述只是讲述了Redisson的lock()的使用,以及基本的流程闭环.