Redis分布式锁附带看门狗线程的实现
1.锁的实现
package com.msb.redis.lock.rdl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Component
public class RedisDistLockWithDog implements Lock {
private final static int LOCK_TIME = 1 * 1000;
private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);
private final static String RS_DISTLOCK_NS = "tdln2:";
private final static String RELEASE_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('del', KEYS[1])\n" +
" else return 0 end";
private ThreadLocal<String> lockerId = new ThreadLocal<>();
private Thread ownerThread;
private String lockName = "lock";
@Autowired
private JedisPool jedisPool;
public String getLockName() {
return lockName;
}
public void setLockName(String lockName) {
this.lockName = lockName;
}
public Thread getOwnerThread() {
return ownerThread;
}
public void setOwnerThread(Thread ownerThread) {
this.ownerThread = ownerThread;
}
@Override
public void lock() {
while (!tryLock()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException("不支持可中断获取锁!");
}
@Override
public boolean tryLock() {
Thread t = Thread.currentThread();
if (ownerThread == t) {
return true;
} else if (ownerThread != null) {
return false;
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String id = UUID.randomUUID().toString();
SetParams params = new SetParams();
params.px(LOCK_TIME);
params.nx();
synchronized (this) {
if ((ownerThread == null) &&
"OK".equals(jedis.set(RS_DISTLOCK_NS + lockName, id, params))) {
lockerId.set(id);
setOwnerThread(t);
if (expireThread == null) {
expireThread = new Thread(new ExpireTask(), "expireThread");
expireThread.setDaemon(true);
expireThread.start();
}
delayDog.add(new ItemVo<>((int) LOCK_TIME, new LockItem(lockName, id)));
System.out.println(Thread.currentThread().getName() + "已获得锁----");
return true;
} else {
System.out.println(Thread.currentThread().getName() + "无法获得锁----");
return false;
}
}
} catch (Exception e) {
throw new RuntimeException("分布式锁尝试加锁失败!", e);
} finally {
jedis.close();
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("不支持等待尝试获取锁!");
}
@Override
public void unlock() {
if (ownerThread != Thread.currentThread()) {
throw new RuntimeException("试图释放无所有权的锁!");
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long) jedis.eval(RELEASE_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS + lockName),
Arrays.asList(lockerId.get()));
System.out.println(result);
if (result.longValue() != 0L) {
System.out.println("Redis上的锁已释放!");
} else {
System.out.println("Redis上的锁释放失败!");
}
} catch (Exception e) {
throw new RuntimeException("释放锁失败!", e);
} finally {
if (jedis != null) jedis.close();
lockerId.remove();
setOwnerThread(null);
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("不支持等待通知操作!");
}
private Thread expireThread;
private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();
private final static String DELAY_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
" else return 0 end";
private class ExpireTask implements Runnable {
@Override
public void run() {
System.out.println("看门狗线程已启动......");
while (!Thread.currentThread().isInterrupted()) {
try {
LockItem lockItem = delayDog.take().getData();
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long) jedis.eval(DELAY_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS + lockItem.getKey()),
Arrays.asList(lockItem.getValue(), LOCK_TIME_STR));
if (result.longValue() == 0L) {
System.out.println("Redis上的锁已释放,无需续期!");
} else {
delayDog.add(new ItemVo<>((int) LOCK_TIME,
new LockItem(lockItem.getKey(), lockItem.getValue())));
System.out.println("Redis上的锁已续期:" + LOCK_TIME);
}
} catch (Exception e) {
throw new RuntimeException("锁续期失败!", e);
} finally {
if (jedis != null) jedis.close();
}
} catch (InterruptedException e) {
System.out.println("看门狗线程被中断");
break;
}
}
System.out.println("看门狗线程准备关闭......");
}
}
@PreDestroy
public void closeExpireThread() {
if (null != expireThread) {
expireThread.interrupt();
}
}
}
2.延迟队列中的元素
package com.msb.redis.lock.rdl;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class ItemVo<T> implements Delayed {
private long activeTime;
private T data;
public ItemVo(long expirationTime, T data) {
super();
this.activeTime = expirationTime + System.currentTimeMillis() - 100;
this.data = data;
}
public long getActiveTime() {
return activeTime;
}
public T getData() {
return data;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.activeTime
- System.currentTimeMillis(), unit);
return d;
}
@Override
public int compareTo(Delayed o) {
long d = (getDelay(TimeUnit.MILLISECONDS)
- o.getDelay(TimeUnit.MILLISECONDS));
if (d == 0) {
return 0;
} else {
if (d < 0) {
return -1;
} else {
return 1;
}
}
}
}
3.测试类
package com.msb.redis.redisbase.adv;
import com.msb.redis.lock.rdl.RedisDistLockWithDog;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@SpringBootTest
public class TestRedisDistLockWithDog {
@Autowired
private RedisDistLockWithDog redisDistLockWithDog;
private int count = 0;
@Test
public void testLockWithDog() throws InterruptedException {
int clientCount = 3;
CountDownLatch countDownLatch = new CountDownLatch(clientCount);
ExecutorService executorService = Executors.newFixedThreadPool(clientCount);
for (int i = 0; i < clientCount; i++) {
executorService.execute(() -> {
try {
redisDistLockWithDog.lock();
System.out.println(Thread.currentThread().getName() + "准备进行累加。");
Thread.sleep(2000);
count++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redisDistLockWithDog.unlock();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println(count);
}
}