在分布式系统中,涉及到多个实例对同一资源加锁的情况,传统的synchronized、ReentrantLock等单进程加锁的API就不再适用,此时就需要使用分布式锁来保证多服务之间加锁的安全性。
常见的分布式锁的实现方式有:zookeeper、Redis。Redis分布式锁相对简单,Redis分布式锁常用于业务场景中,Redisson是Redis实现分布式锁常用方式
Redis分布式锁中,setnx命令,可保证一个key同时只能有一个线程设置成功,这样可实现加锁的互斥性。但是Redisson并未通过setnx命令实现加锁。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.6</version>
</dependency>
package com.hong.springbootjwt.config.redission;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.TransportMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: hong
* @date: 2023/2/16 20:09
* @description RedissonClient
*/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.setTransportMode(TransportMode.NIO);
SingleServerConfig singleServerConfig = config.useSingleServer();
//可以用"rediss://"来启用SSL连接
singleServerConfig.setAddress("redis://127.0.0.1:6379");
singleServerConfig.setPassword("123456");
return Redisson.create(config);
}
}
Redisson加锁使用:
package com.hong.springbootjwt.service.impl;
import com.hong.springbootjwt.service.UserService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
/**
* @author: hong
* @date: 2023/2/16 20:08
* @description
*/
@Service
public class UserServiceImpl implements UserService {
private final RedissonClient redissonClient;
public UserServiceImpl(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
@Override
public void redissionLock() {
// 获取锁对象
RLock myLock = redissonClient.getLock("myLock");
try {
// 加锁
myLock.lock();
...
}catch (Exception e) {
}finally {
// 释放锁
myLock.unlock();
}
}
}
public RLock getLock(String name) {
return new RedissonLock(this.commandExecutor, name);
}
public void lock() {
try {
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
...
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
if (leaseTime != -1L) {
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
...
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil;" +
" end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); return nil;" +
" end;" +
" return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
Redis通过执行LUA脚本实现加锁,保证加锁的原子性,分析上述LUA脚本
这些参数是如何传过去的呢?其实就是在这里
这段LUA的加锁逻辑:
myLock:{
"b983c153-7421-469a-addb-44fb92259a1b:1":1
}
总结:第一个客户端线程加锁的逻辑还是挺简单的,就是判断有无加过锁,没有的话自己去加锁,设置加锁的key,保存加锁的线程和加锁次数,设置锁的过期时间为30s
问题:为何要设置加锁key的过期时间?
主要原因是为了防止死锁,当某客户端获取到锁,还未来得及释放锁,当客户端宕机了,或者释放失败了,一旦没设置过期时间,那么这个锁key会一直存在,当其他线程来加锁的话发生key已经被加锁了,那么其他线程会一直加锁失败,从而造成死锁问题。
在加锁过程中,没指定锁的过期时间,Redisson也会默认给锁设置30s的过期时间,来防止死锁
虽然设置默认过期时间可防止死锁,但若在30s内,任务还未结束,但是锁已经释放失效了,一旦其他线程加锁成功,就可能出现线程安全,数据错乱问题。所以Redisson针对这种未指定超时时间的加锁,实现了一个watchdog机制,即“看门狗机制”自动延长加锁时间
在客户端通过tryLockInnerAsync方法加锁后,如果没指定锁过期时间,那么客户端会起一个定时任务,来定时延长加锁时间,默认10s执行一次,所以watchdog的本质就是一个定时任务
最后定期执行一段LUA脚本,实现加锁时间的延长
脚本中参数解释(同加锁的参数):
这段LUA脚本意思是判断续约的线程和加锁的线程是否为同一个,若为同一个,将锁的过期时间延长30s,然后返回1,代表续约成功,不是的话就返回0,续约失败,下一次定时任务就不会执行
注意:因为有了看门狗机制,所以若没有设置过期时间,并且没有主动释放锁,那么这个锁就永远不会释放,因为定时任务会不断延长锁的过期时间,造成死锁问题。
但是如果发生宕机,是不会造成死锁的,因为宕机了,服务也就没有了,那么看门狗的定时任务就没了,自然不会续约,等锁自动过期了,也就自动释放锁了。
可重入锁的意思就是,同一个客户端同一个线程多次对同一个锁进行加锁。
在Redisson中,可以执行多次lock方法,流程都是一样的,最后调用到LUA脚本,所以可重入锁的逻辑也是通过加锁的LUA脚本实现
下半部分就是可重入锁的逻辑
下面这段if的意思是:判断当前已经加锁的key对应的加锁线程,跟要加锁的线程是否为同一个,如果是,则将该线程对应的加锁次数加1,也即实现了可重入加锁,同时返回nil
可重入锁加锁成功后,加锁key和对应值可能是这样:
myLock:{
"b983c153-7421-469a-addb-44fb92259a1b:1":2
}
当业务执行完毕后,需要主动释放锁,为什么需要主动释放锁呢?
Redisson如何主动释放锁以及避免其他线程释放自己加的锁呢?
主动释放锁是通过unlock方法实现,分析unlock方法的实现:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]);" +
"return 0;" +
"else redis.call('del', KEYS[1]);" +
"redis.call('publish', KEYS[2], ARGV[1]);" +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
LUA脚本解释:
已知如果不指定超时时间的话,存在看门狗线程不断延长加锁时间,不会导致锁超时释放,自动过期,那么指定超时时间的话,是如何实现指定时间释放的呢?
能够设置超时时间的方法:
// 通过传入leaseTime参数可指定锁超时时间
void lock(long leaseTime, TimeUnit unit)
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
已知,无论是否设置锁超时时间,最终都会调用tryAcquireAsync方法进行加锁。只是不指定超时时间的话,传入的leaseTime值是-1,也即不指定超时时间,但是Redisson默认还是会设置30s过期时间;当指定超时时间,那么leaseTime就是自己指定的时间,最终也是通过一个LUA脚本进行加锁逻辑
是否指定超时时间的区别:
总结:
指定超时时间,达到超时释放锁的功能主要通过Redis自动过期来实现,因为指定了超时时间,加锁成功后就不会开启watchdog机制来延长加锁时间
实际项目中,若能比较准确预估代码执行时间,那么可以指定锁超时释放时间,来防止业务执行错误导致无法释放锁的问题;若不能预估代码执行时间,那么可以不指定超时时间,在finally代码块中采用unlock手动释放。
前面已经分析过,第一次加锁逻辑和可重入锁的逻辑,因为LUA脚本加锁的逻辑同时只有一个线程能够执行(Redis是单线程的原因),所以一旦有线程加锁成功,那么另一线程来加锁,前面两个if条件都不成立,最后通过调用Redis的pttl命令返回锁的剩余过期时间回去。
这样,客户端就可根据返回值判断是否加锁成功, 因为第一次加锁和可重入锁的返回值都是nil,而加锁失败就返回了锁的剩余过期时间
// 第一次加锁
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 可重入锁
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 无法加锁成功
return redis.call('pttl', KEYS[1]);
所以加锁的LUA脚本通过条件判断就可实现加锁的互斥操作,保证其他线程无法加锁成功。
总结:加锁的LUA脚本实现了第一次加锁、可重入锁、加锁互斥的逻辑
上面分析已知,加锁失败后,会走如下代码:
这里可看出,最终会执行死循环(自旋)的方式,不停的通过tryAcquire()方法来实现加锁,直到加锁成功后才会跳出死循环,如果一直没有加锁成功,那么就会一直旋转下去,所谓阻塞,就是自旋加锁的方式
但是这种阻塞可能产生问题,如果其他线程释放锁失败,那么这个阻塞加锁的线程会一直阻塞加锁,肯定会出问题的,所以需要设置超过一定时间还未加锁成功的话,就放弃加锁。
超时放弃加锁的方法:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
boolean tryLock(long time, TimeUnit unit)
通过waitTime参数或time参数来指定超时时间,这两个方法的主要区别在于是否支持指定锁超时时间
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
// 超过尝试时间,加锁失败,返回false
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
从源码可看出,实现一定时间内还未获取到锁,就放弃加锁的逻辑,其实相比于一直自旋获取锁,主要是加了超时判断,如果超时了,就退出循环,放弃加锁
什么是公平锁?
公平锁就是指线程成功加锁的顺序,跟线程请求加锁的顺序一样,实现了先来先成功加锁的特点,不插队才叫公平
前面所说的RedissonLock的实现都是非公平锁,但里面有些机制如watchdog机制是公平的
公平锁和非公平锁比较
公平锁
非公平锁:
如何使用公平锁
通过RedissonClient的getFairLock可获取到公平锁。Redisson对于公平锁的实现是RedissonFairLock类,通过RedissonFairLock来加锁,可实现公平锁的特性
public void redissionLock() {
// 获取锁对象
RLock myLock = redissonClient.getFairLock("myLock");
try {
// 加锁
myLock.lock(30,TimeUnit.SECONDS);
} catch (Exception e) {
} finally {
// 释放锁
myLock.unlock();
}
}
RedissonFairLock继承了RedissonLock,主要重写了tryLockInnerAsync方法,也就是加锁逻辑的方法。
概述这段LUA的作用:
在实际开发中,会有很多“读多写少”的场景,对于这种场景,使用独占锁加锁,在高并发情况下,会导致大量线程加锁失败,阻塞,对系统吞吐量有一定影响,为了适配这种“读多写少”的场景,Redisson也实现了读写锁的功能
读写锁的特点:
Redisson中使用读写锁:
public void redissionLock() {
// 获取读写锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
RLock readLock = readWriteLock.readLock();
try {
readLock.lock();
// 业务操作
...
} catch (Exception e) {
} finally {
// 释放锁
readLock.unlock();
}
RLock writeLock = readWriteLock.writeLock();
try {
writeLock.lock();
// 业务操作
...
} catch (Exception e) {
} finally {
// 释放锁
writeLock.unlock();
}
}
Redisson通过RedissonReadWriteLock类实现读写锁功能。通过这个类可以获取到读锁和写锁,所以真正的加锁逻辑是由读锁和写锁实现的
Redisson是如何具体实现读写锁的?
前面已知,加锁成功后会在Redis中维护一个hash的数据结构,存储加锁线程和加锁次数。在读写锁的实现中,会往hash数据结构中多维护一个mode字段,来表示当前加锁的模式。
所以能够实现读写锁,最主要是因为维护了一个加锁模式的字段mode,这样当线程来加锁的时候,就能根据当前加锁模式结合读写的特性来判断要不要让当前线程加锁成功
批量加锁的意思是同时加几个锁,只有这些锁都加成功了,才算真正的加锁成功!
比如:一个下单业务中,同时需要锁定订单、库存、商品,基于这种需要锁多种资源的场景中,Redisson提供了批量加锁的实现,对应的实现类是RedissonMultiLock
使用联锁:
public void redissionLock() {
// 获取读写锁
RLock myLock1 = redissonClient.getLock("myLock1");
RLock myLock2 = redissonClient.getLock("myLock2");
RLock myLock3 = redissonClient.getLock("myLock3");
RLock multiLock = redissonClient.getMultiLock(myLock1,myLock2,myLock3);
try {
multiLock.lock();
// 业务操作
...
} catch (Exception e) {
} finally {
// 释放锁
multiLock.unlock();
}
}
Redisson对于批量加锁的实现也很简单,源码如下:
就是根据顺序依次调用tryLock,传入myLock1,myLock2,myLock3加锁方法,如果都成功加锁了,那么multiLock就算加锁成功
对于单Redis实例来说,如果Redis宕机了,那么整个系统就无法运行,所以为了保证Redis的高可用,一般都会采用主从或哨兵模式,但是一旦使用了主从或哨兵模式,此时Redis的分布式锁就可能出现问题
例如,使用哨兵模式
基于这种模式,Redis客户端会在master节点上加锁,然后异步复制到slave节点上。但是一旦master节点宕机,那么哨兵感知到,就会从slave节点选择一个节点作为主节点。
假设客户端对原主节点加锁,加锁成功后还未来得及同步到从节点,主节点宕机了,从节点变为了主节点,此时从节点是没有加锁信息的,如果其他客户端来加锁,是能够加锁成功的!!
针对此问题,Redis官方提供一种RedLock算法,Redisson刚好实现了这种算法
RedLock算法
在Redis分布式环境中,假设有N个master节点,这些节点相互独立,不存在主从复制或其他集群协调机制。
前面描述过,在Redis单例下怎么安全获取和释放锁,需要确保将在N个实例上使用此方法获取和释放锁。为了获取锁,客户端应该执行以下操作:
Redisson对RedLock算法的实现:
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
RedissonRedLock加锁过程如下:
RedissonRedLock底层其实也是基于RedissonMultiLock实现的,RedissonMultiLock要求所有的加锁成功才算成功,RedissonRedLock要求只要有N/2+1个成功就算成功