Redisson分布式锁实现

发布时间:2023年12月27日

在分布式系统中,确保多个节点间协作执行时的数据一致性是至关重要的。分布式锁作为一种常见的技术手段,能够帮助实现多个节点对共享资源的安全访问。Redisson作为一个基于Redis的Java驱动库,提供了简单且强大的分布式锁功能。

Redisson使用

引入maven依赖

  <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>3.14.1</version>
</dependency>

Redisson客户端注册为Spring Bean

@Configuration
public class RedissonConfig {
	// 配置redis信息
    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.database}")
    private int database;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;

	//  Redisson客户端注册到Spring管理
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+host+":"+port).setDatabase(database);
        if (StringUtils.isNotBlank(password)){
            config.useSingleServer().setPassword(password);
        }
        config.setCodec(new StringCodec());
        return Redisson.create(config);
    }
}

使用

使用非常的简单

 public String redissonTest() {
 	// 业务key 
 	String key  = "myKey";
 	RLock lock = redissonClient.getLock(key);
        lock.lock();
        try {
        // 执行业务逻辑 
        ...
        } finally {
            lock.unlock();
        }
        return "done";
    }

底层原理

从上述可以发现,redisson的使用非常的简单,现在让我们来探究它的底层实现原理.

在这里插入图片描述

// 大致逻辑,如下图
1.尝试获取锁
2.若获取到锁,则去执行业务逻辑,最终释放锁unlock.
3.若没有获取到锁,则首先会订阅锁状态变更的消息.
4.接着遍历
5.首先会再次尝试获取锁,若得到锁,走2的逻辑.
6.若还是没有得到锁,此时会有锁剩余过期时间,然后对该线程进行阻塞(阻塞时间为ttl)
7.但是在阻塞期间,锁可能提前释放了,所以释放锁的逻辑中会发布一个消息,提醒订阅的线程可以去再次争抢锁,不用等到ttl过去.

在这里插入图片描述

// redisson客户端加锁
lock.lock();
// ->
lock(-1, null, false);
// ->
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    // 获取当前线程的ID
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,并获取锁的剩余有效时间
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // 如果成功获取锁,则直接返回,否则进行下一步操作
    if (ttl == null) {
        return;
    }

    // 订阅锁的状态变更消息
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    // 根据是否支持可中断的方式,同步处理订阅操作
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }

    try {
        // 循环尝试获取锁
        while (true) {
            // 再次尝试获取锁,并获取锁的剩余有效时间
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // 如果成功获取锁,则跳出循环,否则继续等待
            if (ttl == null) {
                break;
            }

            // 根据锁的剩余有效时间,进行等待
            if (ttl >= 0) {
                try {
                    // 尝试等待锁状态变更消息
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    // 如果可中断,并发生中断异常,根据情况抛出或继续等待
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                // 锁超时等待,根据是否支持可中断的方式进行等待
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 取消订阅锁的状态变更消息
        unsubscribe(future, threadId);
    }
    // 异步获取锁,该行代码可能是注释掉的备用代码
    // get(lockAsync(leaseTime, unit));
}

加锁逻辑

现在我们来看redisson是如何加锁的,实际是利用了redis的lua脚本实现的

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
// 现在来解释这段lua脚本
// KEYS[1] = getName()就是我们的业务key;
// ARGV[1] = internalLockLeaseTime;锁过期时间,默认30s
// ARGV[2] = getLockName(threadId);  uuid + 线程id
// 第一段:如果锁不存在,则创建一个新的锁,并设置过期时间,返回空,表示获取锁成功
redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; "
// 第二段:如果锁存在,并且是当前线程持有则对锁进行更新(+1,这时可重入锁的实现),同时延长过期时间,返回空,表示获取锁成功
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil;  
// 第三段:锁存在,但不是当先线程持有,返回锁的剩余有效时间
return redis.call('pttl', KEYS[1]);,

这里使用lua脚本,并且redis操作数据是单线程的,所以保证了原子性.

过期时间自动延时

如果持有锁的线程执行业务时间非常久,超过了过期时间,这样久有问题了,所以redisson还为我们提供了自动延时的功能。
当线程加锁成功后,会异步起一个线程延长锁时间.
在这里插入图片描述

// 我们只看关键部分
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  	...
  	// 尝试获取锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 当获取锁的异步操作完成后的回调处理
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    // 如果出现异常,则直接返回,不执行后续操作
        if (e != null) { 
            return;
        }

        // 如果成功获取到锁,执行锁自动续期的调度
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture; // 返回获取锁的异步操作
}

// 成功获取锁后则ttlRemaining=null,则会进入
// -> scheduleExpirationRenewal(threadId);
// -> renewExpiration();
private void renewExpiration() {
    ...
    // 创建一个定时任务,在一定时间后执行续期操作
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
			...
            // 异步执行锁的过期续期操作
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
  				...        
                if (res) {
                    // 如果续期成功,则再次调度续期操作
                    // res=true,代表锁还在,递归调用自己
                    // 这个方法是延时调用的,默认30/3,10秒调用一次
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 设置定时任务的执行时间
    
    // 将定时任务设置到过期续期信息中
    ee.setTimeout(task);
}

// -> 这里又是个lua脚本
 RFuture<Boolean> future = renewExpirationAsync(threadId);
 // -> 如果锁还在就延长锁的过期时间并返回true,锁不在了返回false
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));

释放锁逻辑

持有锁的线程执行完业务逻辑后需要释放锁.

// 释放锁逻辑
RFuture<Boolean> future = unlockInnerAsync(threadId);
// -> 也是使用了lua脚本
  protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        		// 如果锁不存在或者不属于当前线程,则返回空,表示无法解锁
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        // 锁存在,尝试将锁的计数器减一
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +		
                        // 如果计数器仍大于0,则说明锁未完全释放,设置新的过期时间并返回0
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        // 如果计数器减为0,说明锁已完全释放,删除锁并发布解锁消息,返回1
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        // 其他情况返回空
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

如果锁释放了,会发布一个事件,之前获取锁失败的线程都会订阅这个消息,然后会重新竞争锁.

锁释放消息处理

上一步知道锁释放的消息为:LockPubSub.UNLOCK_MESSAGE

 @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }
			// 这里释放了阻塞的线程
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }

            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

总结

Redisson提供了强大的分布式锁功能,强烈推荐!!!
上述只是讲述了Redisson的lock()的使用,以及基本的流程闭环.

文章来源:https://blog.csdn.net/2302_77659577/article/details/135223257
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。