分布式锁实现方案汇总

发布时间:2024年01月08日

一、公共抽象类

定义抽象类AbstractLock实现Lock接口,所有分布式锁实现必须继承AbstractLock,去写具体的实现逻辑,方便后续替换锁。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public abstract class AbstractLock implements Lock {
	@Override
	public void lock() {
		throw new RuntimeException("该操作不支持");
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		throw new RuntimeException("该操作不支持");
	}

	@Override
	public boolean tryLock() {
		throw new RuntimeException("该操作不支持");
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		throw new RuntimeException("该操作不支持");
	}

	@Override
	public void unlock() {
		throw new RuntimeException("该操作不支持");
	}

	@Override
	public Condition newCondition() {
		throw new RuntimeException("该操作不支持");
	}
}

二、Redis实现分布式锁

导入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.8.2</version>
</dependency>

1、核心代码

1)单锁实现

import cn.forlan.lock.base.AbstractLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;


public class RedisLock extends AbstractLock {

	private RedissonClient redissonClient;
	private String key;

	public RedisLock(RedissonClient redissonClient, String key) {
		this.redissonClient = redissonClient;
		this.key = key;
	}

	@Override
	public void lock() {
		redissonClient.getLock(key).lock();
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		redissonClient.getLock(key).lockInterruptibly();
	}

	@Override
	public boolean tryLock() {
		return redissonClient.getLock(key).tryLock();
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		return redissonClient.getLock(key).tryLock(time, unit);
	}

	@Override
	public void unlock() {
		redissonClient.getLock(key).unlock();
	}

	@Override
	public Condition newCondition() {
		return redissonClient.getLock(key).newCondition();
	}
}

2)红锁实现

需要锁定多个Redis才算成功,一般是超过一半锁定成功才算成功。

import cn.forlan.lock.base.AbstractLock;
import org.redisson.RedissonRedLock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

public class RedisRedLock extends AbstractLock {

	private RedissonRedLock redLock;

	public RedisRedLock() {
	}

	public RedisRedLock(RedissonRedLock redLock) {
		this.redLock = redLock;
	}

	@Override
	public void lock() {
		redLock.lock();
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		redLock.lockInterruptibly();
	}

	@Override
	public boolean tryLock() {
		return redLock.tryLock();
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		return redLock.tryLock(time, unit);
	}

	@Override
	public void unlock() {
		redLock.unlock();
	}

	@Override
	public Condition newCondition() {
		return redLock.newCondition();
	}
}

2、单元测试

import cn.forlan.lock.redis.RedisLock;
import cn.forlan.lock.redis.RedisRedLock;
import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class RedisLockTest {

	private RedissonClient getRedissonClient() {
		Config config = new Config();
		config.useSingleServer()
				.setTimeout(1000000)
				.setAddress("redis://127.0.0.1:6379");
		RedissonClient redissonClient = Redisson.create(config);
		return redissonClient;
	}

	private ExecutorService executorService = Executors.newCachedThreadPool();

	@Test
	public void SingleLockTest() throws InterruptedException {
		int[] count = {0};
		RedissonClient client = getRedissonClient();
		RedisLock redisLock = new RedisLock(client, "lock_key");
		for (int i = 0; i < 1000; i++) {
			executorService.submit(() -> {
				try {
					redisLock.lock();
					count[0]++;
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					try {
						redisLock.unlock();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
		}
		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.HOURS);
		System.out.println(count[0]);
	}

	public static RLock getRLock(String url, String key) {
		Config config = new Config();
		config.useSingleServer().setAddress(url);
		RedissonClient redissonClient = Redisson.create(config);
		return redissonClient.getLock(key);
	}


	@Test
	public void RedLockTest() throws InterruptedException {
		int[] count = {0};
		RedissonRedLock redissonRedLock = new RedissonRedLock(
				getRLock("redis://192.168.56.100:6379", "lock_key"),
				getRLock("redis://127.0.0.1:6379", "lock_key"),
				getRLock("redis://192.168.56.101:6379", "lock_key")
		);

		RedisRedLock redLock = new RedisRedLock(redissonRedLock);

		for (int i = 0; i < 1000; i++) {
			executorService.submit(() -> {
				try {
					redLock.lock();
					count[0]++;
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					try {
						redLock.unlock();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
		}
		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.HOURS);
		System.out.println(count[0]);
	}
}

3、SpringBoot应用

配置application.properties,单锁的话,配置一个即可

spring.redis.database=2
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.host1=127.0.0.1
spring.redis.password1=
spring.redis.port1=6379
spring.redis.host2=192.168.56.100
spring.redis.password2=
spring.redis.port2=6379
spring.redis.host3=192.168.56.101
spring.redis.password3=
spring.redis.port3=6379

定义单锁配置类,方便我们依赖注入使用

import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class RedissonClientConfig {

	@Value("${spring.redis.host}")
	private String host;

	@Value("${spring.redis.port}")
	private String port;

	@Value("${spring.redis.password}")
	private String password;

	@Bean
	@Primary
	public RedissonClient getRedisson() {
		Config config = new Config();
		if (StringUtils.isBlank(password)) {
			config.useSingleServer().setAddress("redis://" + host + ":" + port);
		} else {
			config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
		}
		return Redisson.create(config);
	}

}

定义红锁配置类,方便我们依赖注入使用

import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonRedLockConfig {

	@Value("${spring.redis.host1}")
	private String host1;
	@Value("${spring.redis.port1}")
	private String port1;
	@Value("${spring.redis.password1}")
	private String password1;
	@Value("${spring.redis.host2}")
	private String host2;
	@Value("${spring.redis.port2}")
	private String port2;
	@Value("${spring.redis.password2}")
	private String password2;
	@Value("${spring.redis.host3}")
	private String host3;
	@Value("${spring.redis.port3}")
	private String port3;
	@Value("${spring.redis.password3}")
	private String password3;

	@Bean
	public RedissonClient redisson1() {
		Config config = new Config();
		if (StringUtils.isBlank(password1)) {
			config.useSingleServer().setAddress("redis://" + host1 + ":" + port1);
		} else {
			config.useSingleServer().setAddress("redis://" + host1 + ":" + port1).setPassword(password1);
		}
		return Redisson.create(config);
	}

	@Bean
	public RedissonClient redisson2() {
		Config config = new Config();
		if (StringUtils.isBlank(password2)) {
			config.useSingleServer().setAddress("redis://" + host2 + ":" + port2);
		} else {
			config.useSingleServer().setAddress("redis://" + host2 + ":" + port2).setPassword(password2);
		}
		return Redisson.create(config);
	}

	@Bean
	public RedissonClient redisson3() {
		Config config = new Config();
		if (StringUtils.isBlank(password3)) {
			config.useSingleServer().setAddress("redis://" + host3 + ":" + port3);
		} else {
			config.useSingleServer().setAddress("redis://" + host3 + ":" + port3).setPassword(password3);
		}
		return Redisson.create(config);
	}

}

单锁使用案例

@Autowired
private RedissonClient redissonClient;

public String seckill(String skuId){
	AbstractLock lock = new RedisLock(redissonClient, key);
	Boolean lcokFlag = false;
	try {
		lock.lock();
		lcokFlag = true;
		// 业务代码...
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		if (lcokFlag) {
			lock.unlock();
		}
	}
}

红锁使用案例

@Resource(name = "redisson1")
private RedissonClient redissonClient1;
@Resource(name = "redisson2")
private RedissonClient redissonClient2;
@Resource(name = "redisson3")
private RedissonClient redissonClient3;

public String seckill(String skuId){
	RedissonRedLock redissonRedLock = new RedissonRedLock(redissonClient1.getLock(key), redissonClient2.getLock(key));
	AbstractLock lock = new RedisRedLock(redissonRedLock);
	Boolean lcokFlag = false;
	try {
		lock.lock();
		lcokFlag = true;
		// 业务代码...
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		if (lcokFlag) {
			lock.unlock();
		}
	}
}

三、Zookeeper分布式锁

导入依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.7.0</version>
</dependency>

1、核心代码

import cn.forlan.lock.base.AbstractLock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;


public class ZKLock extends AbstractLock {

	private InterProcessLock lock;

	public ZKLock(String zkAddress, String lockPath) {
		CuratorFramework client = CuratorFrameworkFactory.newClient(
				zkAddress,
				new RetryNTimes(5, 5000)
		);
		client.start();
		if (client.getState() != CuratorFrameworkState.STARTED) {
			throw new RuntimeException("客户端启动失败。。。");
		}
		this.lock = defaultLock(client, lockPath);
	}

	public ZKLock(CuratorFramework client, String lockPath) {
		this.lock = defaultLock(client, lockPath);
	}

	private InterProcessLock defaultLock(CuratorFramework client, String lockPath) {
		return new InterProcessMutex(client, lockPath);
	}

	@Override
	public void lock() {
		try {
			lock.acquire();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public boolean tryLock() {
		boolean flag = false;
		try {
			flag = lock.acquire(0, TimeUnit.SECONDS);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		return flag;
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		boolean flag = false;
		try {
			flag = lock.acquire(time, unit);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		return flag;
	}

	@Override
	public void unlock() {
		try {
			lock.release();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

}

2、单元测试

import cn.forlan.lock.zookeeper.ZKLock;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author weichenglin
 * @since 2023-12-19-下午 04:48:05
 */
@Slf4j
public class ZKLockTest {
	private ExecutorService executorService = Executors.newCachedThreadPool();


	@Test
	public void testLock() throws Exception {
		ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
		int[] num = {0};
		long start = System.currentTimeMillis();
		for (int i = 0; i < 100; i++) {
			executorService.submit(() -> {
				try {
					zkLock.lock();
					num[0]++;
				} catch (Exception e) {
					throw new RuntimeException(e);
				} finally {
					if (zkLock != null) {
						zkLock.unlock();
					}
				}
			});

		}
		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.HOURS);
		log.info("耗时:{}", System.currentTimeMillis() - start);
		System.out.println(num[0]);
	}

	@Test
	public void testNoLock() throws Exception {
		long start = System.currentTimeMillis();
		int[] num = {0};
		for (int i = 0; i < 100; i++) {
			executorService.submit(() -> {
				num[0]++;
			});

		}
		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.HOURS);
		log.info("耗时:{}", System.currentTimeMillis() - start);
		System.out.println(num[0]);
	}

	/**
	 * 锁的可重入测试
	 */
	@Test
	public void testRe() {
		ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
		Thread thread1 = new Thread(() -> {
			zkLock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + "第一次入锁");
				Thread.sleep(1000);
				zkLock.lock();
				System.out.println(Thread.currentThread().getName() + "第二次入锁");
			} catch (Exception e) {
			} finally {
				zkLock.unlock();
				zkLock.unlock();
			}
		}, "thread1");
		thread1.start();

		Thread thread2 = new Thread(() -> {
			zkLock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + "第一次入锁");
			} catch (Exception e) {
			} finally {
				zkLock.unlock();
			}
		}, "thread2");
		thread2.start();
		try {
			thread1.join();
			thread2.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testTrySuccess() {
		ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
		boolean b = zkLock.tryLock();
		try {
			if (b) {
				log.info("{}获取锁成功", Thread.currentThread().getName());
			} else {
				log.info("{}获取锁失败", Thread.currentThread().getName());
			}
		} finally {
			zkLock.unlock();
		}
	}

	@Test
	public void testTryFalse() {
		ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
		Thread thread1 = new Thread(() -> {
			zkLock.lock();
			try {
				log.info("{}获得锁", Thread.currentThread().getName());
				Thread.sleep(10000);
			} catch (Exception e) {
			} finally {
				zkLock.unlock();
				log.info("{}释放锁", Thread.currentThread().getName());
			}
		}, "thread1");
		thread1.start();
		try {
			Thread.sleep(2000);
		} catch (Exception e) {
		}
		Thread thread2 = new Thread(() -> {
			log.info("{}尝试获得锁", Thread.currentThread().getName());
			boolean b = zkLock.tryLock();
			if (b) {
				try {
					log.info("{}获取锁成功", Thread.currentThread().getName());
				} catch (Exception e) {
				} finally {
					zkLock.unlock();
				}
			} else {
				log.info("{}获取锁失败", Thread.currentThread().getName());
			}
		}, "thread2");
		thread2.start();
		try {
			thread1.join();
			thread2.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testTryTimeOut() {
		ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
		Thread thread1 = new Thread(() -> {
			zkLock.lock();
			try {
				log.info("{}获得锁", Thread.currentThread().getName());
				Thread.sleep(10000);
			} catch (Exception e) {
			} finally {
				zkLock.unlock();
				log.info("{}释放锁", Thread.currentThread().getName());
			}
		}, "thread1");
		thread1.start();
		try {
			Thread.sleep(2000);
		} catch (Exception e) {
		}
		Thread thread2 = new Thread(() -> {
			log.info("{}尝试获得锁", Thread.currentThread().getName());
			boolean b = false;
			try {
				b = zkLock.tryLock(3, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			if (b) {
				try {
					log.info("{}获取锁成功", Thread.currentThread().getName());
				} catch (Exception e) {
				} finally {
					zkLock.unlock();
				}
			} else {
				log.info("{}获取锁失败", Thread.currentThread().getName());
			}
		}, "thread2");
		thread2.start();


		try {
			thread1.join();
			thread2.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

3、SpringBoot应用

配置application.properties

zookeeper.connectionString=127.0.0.1:2181

定义Zookeeper配置类,方便我们依赖注入使用

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZkConfig {

	@Value("${zookeeper.connectionString}")
	private String connectionString;

	@Bean(initMethod = "start", destroyMethod = "close")
	public CuratorFramework curatorFramework() {
		return CuratorFrameworkFactory.newClient(connectionString, new RetryNTimes(5, 5000));
	}
}

使用案例

@Autowired
private CuratorFramework client;

public String seckill(String skuId){
	AbstractLock lock = new ZKLock(client, "/lockPath");
	Boolean lcokFlag = false;
	try {
		lock.lock();
		lcokFlag = true;
		// 业务代码...
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		if (lcokFlag) {
			lock.unlock();
		}
	}
}

四、Etcd实现分布式锁

Etcd下载地址:https://github.com/etcd-io/etcd/releases/

导入依赖

<dependency>
    <groupId>io.etcd</groupId>
    <artifactId>jetcd-core</artifactId>
    <version>0.5.0</version>
</dependency>

1、核心代码

自定义锁对象信息

package cn.forlan.lock.etcd;

import lombok.Data;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

@Data
public class LockData {
	private String lockKey;
	private boolean lockSuccess;
	private long leaseId;
	private ScheduledExecutorService service;
	private Thread owningThread;
	private String lockPath;
	final AtomicInteger lockCount = new AtomicInteger(1);

	public LockData() {
	}

	public LockData(Thread owningThread, String lockPath) {
		this.owningThread = owningThread;
		this.lockPath = lockPath;
	}
	
}

继承AbstractLock重新可以实现的方法

package cn.forlan.lock.etcd;

import cn.forlan.lock.base.AbstractLock;
import com.google.common.collect.Maps;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lock.LockResponse;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class EtcdDistributeLock extends AbstractLock {

	private Client client;
	private Lock lockClient;
	private Lease leaseClient;
	private String lockKey;
	private String lockPath;
	/**
	 * 锁的次数
	 */
	private AtomicInteger lockCount;
	/**
	 * 租约有效期,防止客户端崩溃,可在租约到期后自动释放锁;另一方面,正常执行过程中,会自动进行续租,单位 ns
	 */
	private Long leaseTTL;
	/**
	 * 续约锁租期的定时任务,初次启动延迟,单位默认为 s,默认为1s,可根据业务定制设置
	 */
	private Long initialDelay = 0L;
	/**
	 * 定时任务线程池类
	 */
	ScheduledExecutorService service = null;
	/**
	 * 保存线程与锁对象的映射,锁对象包含重入次数,重入次数的最大限制为Int的最大值
	 */
	private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

	public EtcdDistributeLock() {
	}

	public EtcdDistributeLock(Client client, String lockKey, long leaseTTL, TimeUnit unit) {
		this.client = client;
		lockClient = client.getLockClient();
		leaseClient = client.getLeaseClient();
		this.lockKey = lockKey;
		// 转纳秒
		this.leaseTTL = unit.toNanos(leaseTTL);
		service = Executors.newSingleThreadScheduledExecutor();
	}


	@Override
	public void lock() {
		// 检查重入性
		Thread currentThread = Thread.currentThread();
		LockData oldLockData = threadData.get(currentThread);
		if (oldLockData != null && oldLockData.isLockSuccess()) {
			// re-entering
			int lockCount = oldLockData.lockCount.incrementAndGet();
			if (lockCount < 0) {
				throw new Error("超出可重入次数限制");
			}
			return;
		}

		// 记录租约 ID
		Long leaseId = 0L;
		try {
			leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
			// 续租心跳周期
			long period = leaseTTL - leaseTTL / 5;
			// 启动定时任务续约
			service.scheduleAtFixedRate(new EtcdDistributeLock.KeepAliveRunnable(leaseClient, leaseId),
					initialDelay, period, TimeUnit.NANOSECONDS);
			LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
			if (lockResponse != null) {
				lockPath = lockResponse.getKey().toString(Charset.forName("utf-8"));
				log.info("获取锁成功,锁路径:{},线程:{}", lockPath, currentThread.getName());
			}
		} catch (InterruptedException | ExecutionException e) {
			log.error("获取锁失败", e);
			return;
		}
		// 获取锁成功,锁对象设置
		LockData newLockData = new LockData(currentThread, lockKey);
		newLockData.setLeaseId(leaseId);
		newLockData.setService(service);
		threadData.put(currentThread, newLockData);
		newLockData.setLockSuccess(true);
	}

	@Override
	public void unlock() {
		Thread currentThread = Thread.currentThread();
		LockData lockData = threadData.get(currentThread);
		if (lockData == null) {
			throw new IllegalMonitorStateException("You do not own the lock: " + lockKey);
		}
		int newLockCount = lockData.lockCount.decrementAndGet();
		if (newLockCount > 0) {
			return;
		}
		if (newLockCount < 0) {
			throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + lockKey);
		}
		try {
			// 释放锁
			if (lockPath != null) {
				lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
			}
			if (lockData != null) {
				// 关闭定时任务
				lockData.getService().shutdown();
				// 删除租约
				if (lockData.getLeaseId() != 0L) {
					leaseClient.revoke(lockData.getLeaseId());
				}
			}
		} catch (InterruptedException | ExecutionException e) {
			log.error("解锁失败", e);
		} finally {
			// 移除当前线程资源
			threadData.remove(currentThread);
		}
	}

	/**
	 * 心跳续约线程类
	 */
	public static class KeepAliveRunnable implements Runnable {
		private Lease leaseClient;
		private long leaseId;

		public KeepAliveRunnable(Lease leaseClient, long leaseId) {
			this.leaseClient = leaseClient;
			this.leaseId = leaseId;
		}

		@Override
		public void run() {
			// 对该leaseid进行一次续约
			leaseClient.keepAliveOnce(leaseId);
		}
	}

}

2、单元测试

import cn.forlan.lock.etcd.EtcdDistributeLock;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.*;

public class EtcdLockTest {
	private Client client;
	private String key = "/etcd/lock";
	private static final String server = "http://127.0.0.1:2379";
	private ExecutorService executorService = Executors.newFixedThreadPool(10000);

	@Before
	public void before() throws Exception {
		initEtcdClient();
	}

	private void initEtcdClient() {
		client = Client.builder().endpoints(server).build();
	}

	/**
	 * 测试etcd是否正常
	 */
	public static void main(String[] args) throws ExecutionException, InterruptedException {
		Client client = Client.builder().endpoints("http://127.0.0.1:2379").build();
		KV kvClient = client.getKVClient();
		ByteSequence key = ByteSequence.from("test_key".getBytes());
		ByteSequence value = ByteSequence.from("test_value".getBytes());

		// put the key-value
		kvClient.put(key, value).get();

		// get the CompletableFuture
		CompletableFuture<GetResponse> getFuture = kvClient.get(key);

		// get the value from CompletableFuture
		GetResponse response = getFuture.get();

		// delete the key
		kvClient.delete(key).get();
	}

	@Test
	public void testEtcdDistributeLock() throws InterruptedException {
		int[] count = {0};
		for (int i = 0; i < 100; i++) {
			executorService.submit(() -> {
				final EtcdDistributeLock lock = new EtcdDistributeLock(client, key, 20, TimeUnit.SECONDS);
				try {
					lock.lock();
					count[0]++;
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					try {
						lock.unlock();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
		}
		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.HOURS);
		System.err.println("执行结果: " + count[0]);
	}

	@After
	public void after() {
		client.close();
	}
}

3、SpringBoot应用

配置application.properties

etcd.url=http://127.0.0.1:2379

定义Etcd配置类,方便我们依赖注入使用

@Configuration
public class EtcdConfig {

	@Value("${etcd.url}")
	private String etcdUrl;

	@Bean
	public Client getClient(){
		return Client.builder().endpoints(etcdUrl).build();
	}
}

具体使用如下:

@Autowired
private Client client;

public String seckill(String skuId){
	AbstractLock lock = new EtcdDistributeLock(client, key, 20, TimeUnit.SECONDS);
	Boolean lcokFlag = false;
	try {
		lock.lock();
		lcokFlag = true;
		// 业务代码...
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		if (lcokFlag) {
			lock.unlock();
		}
	}
}

五、MySQL分布式锁

导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.41</version>
</dependency>

application.properties

#datasource
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

1、核心代码

1)基于for update

创建表

CREATE TABLE `fu_distribute_lock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`lock_name` varchar(100) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lock_name` (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

核心代码:可重入基于行锁,不支持行锁的话,这个无效

import cn.forlan.lock.base.AbstractLock;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@Slf4j
public class MySQLForUpdateDistributeLock extends AbstractLock {

	public static final String SELECT_SQL = "select * from fu_distribute_lock where `lock_name`=? for update";
	public static final String INSERT_SQL = "insert into fu_distribute_lock(lock_name) values(?)";

	private final DataSource dataSource;
	private ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
	private Connection connection;
	private String lockName;

	public MySQLForUpdateDistributeLock(DataSource dataSource, String lockName) {
		this.lockName = lockName;
		this.dataSource = dataSource;
	}

	@Override
	public void lock() {
		PreparedStatement statement = null;
		ResultSet resultSet = null;
		try {
			while (true) {
				connection = dataSource.getConnection();
				connection.setAutoCommit(false);
				statement = connection.prepareStatement(SELECT_SQL);
				statement.setString(1, lockName);
				resultSet = statement.executeQuery();
				if (resultSet.next()) {
					return;
				}
				this.gracefulClose(resultSet, statement, connection);
				log.info("锁记录不存在,正在创建");
				Connection insertConnection = dataSource.getConnection();
				PreparedStatement insertStatement = null;
				try {
					insertStatement = insertConnection.prepareStatement(INSERT_SQL);
					insertStatement.setString(1, lockName);
					if (insertStatement.executeUpdate() == 1) {
						log.info("创建锁记录成功");
					}
				} catch (Exception e) {
					log.error("创建锁记录异常", e);
				} finally {
					this.gracefulClose(insertStatement, insertConnection);
				}
			}
		} catch (Exception e) {
			log.error("获取锁异常", e);
		} finally {
			this.gracefulClose(resultSet, statement);
		}
	}

	@Override
	public boolean tryLock() {
		try {
			lock();
			return true;
		} catch (Exception e) {
			return false;
		}
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) {
		final Future<?> future = threadPoolExecutor.submit(() -> {
			try {
				lock();
				return 1;
			} catch (Exception e) {
				throw new RuntimeException(e);
			}
		});
		try {
			final Object o = future.get(time, unit);
			if (o == null) {
				future.cancel(true);
				return false;
			}
			return true;
		} catch (Exception e) {
			return false;
		}
	}

	@Override
	public void unlock() {
		try {
			connection.commit();
			connection.close();
		} catch (SQLException throwables) {
			throwables.printStackTrace();
		}
	}

	public void gracefulClose(AutoCloseable... closeables) {
		Arrays.asList(closeables).forEach(closeable -> {
			if (closeable != null) {
				try {
					closeable.close();
				} catch (Exception e) {
				}
			}
		});
	}
}

2)基于ID

不可重入,目前引擎最好用myisam,innodb 由于gap和意向锁会造成死锁,rollback 性能下降,且机器down掉 造成死锁

创建表

CREATE TABLE `id_distribute_lock` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `lock_name` varchar(100) NOT NULL,
  `expire_time` bigint(20) NOT NULL,
  `thread_id` varchar(100) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `lock_name` (`lock_name`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;

核心代码实现

package cn.forlan.lock.mysql;

import cn.forlan.lock.base.AbstractLock;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@Slf4j
public class MySQLIDDistributeLock extends AbstractLock {

	private static final String SELECT_SQL_FORMAT = "select * from id_distribute_lock where lock_name=?";
	private static final String UPDATE_SQL_FORMAT = "update id_distribute_lock set expire_time=? where lock_name=?";
	private static final String INSERT_SQL_FORMAT = "insert into id_distribute_lock(lock_name,expire_time,thread_id) values(?,?,?)";
	private static final String DELETE_SQL_FORMAT = "delete from id_distribute_lock where lock_name=?";
	private final DataSource dataSource;
	private ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
	private int expireTime;
	private long reletTime;
	private WatchDog watchDog;
	private String threadId;
	private String lockName;

	public MySQLIDDistributeLock(DataSource dataSource, int expireTime, String lockName) {
		this.dataSource = dataSource;
		this.expireTime = expireTime;
		this.reletTime = this.expireTime / 2;
		this.lockName = lockName;
		threadId = Thread.currentThread().getId() + "-" + UUID.randomUUID().toString();
		watchDog = new WatchDog(() -> reletAndCheck(), reletTime, TimeUnit.MILLISECONDS);
	}

	@Override
	public void lock() {
		watchDog.start();
		while (true) {
			try (Connection connection = dataSource.getConnection();
				 PreparedStatement statement = connection.prepareStatement(INSERT_SQL_FORMAT)) {
				statement.setString(1, this.lockName);
				statement.setLong(2, System.currentTimeMillis() + expireTime);
				statement.setString(3, this.threadId);
				try {
					//获取锁成功
					if (statement.executeUpdate() > 0) {
						log.info("获取锁成功");
						break;
					}
				} catch (SQLIntegrityConstraintViolationException e) {
					e.printStackTrace();
				} catch (Exception e) {
					e.printStackTrace();
				}
				// 睡眠100ms后继续获取锁,锁力度较大可修改这个时间
				Thread.sleep(100);
			} catch (Exception e) {
				throw new RuntimeException(e);
			}
		}
	}

	private void reletAndCheck() {
		ResultSet resultSet = null;
		PreparedStatement preparedStatement = null;
		Connection connection = null;
		try {
			log.info("开始查询锁");
			connection = dataSource.getConnection();
			preparedStatement = connection.prepareStatement(SELECT_SQL_FORMAT);
			preparedStatement.setString(1, this.lockName);
			resultSet = preparedStatement.executeQuery();

			if (resultSet.next()) {
				final String threadId = resultSet.getString("thread_id");
				final long expireTime = resultSet.getLong("expire_time");
				log.debug("thread_id:{}", threadId);
				if (this.threadId.equals(threadId)) {
					// 续租
					log.info("续租中");
					try (PreparedStatement updatePreparedStatement = connection.prepareStatement(UPDATE_SQL_FORMAT)) {
						updatePreparedStatement.setLong(1, expireTime + reletTime);
						updatePreparedStatement.setString(2, this.lockName);
						updatePreparedStatement.executeUpdate();
					} catch (Exception e) {
						throw new RuntimeException(e);
					}
				} else {
					// 检查时间是否过期
					if (System.currentTimeMillis() > expireTime) {
						log.info("锁:{}过期删除中", lockName);
						delete();
					}
				}
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		} finally {
			this.gracefulClose(resultSet, preparedStatement, connection);
		}
	}

	private void delete() {
		log.info("删除锁");
		Connection connection = null;
		PreparedStatement preparedStatement = null;
		try {
			connection = this.dataSource.getConnection();
			preparedStatement = connection.prepareStatement(DELETE_SQL_FORMAT);
			preparedStatement.setString(1, this.lockName);
			if (preparedStatement.executeUpdate() == 1) {
				log.info("刪除成功");
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		} finally {
			this.gracefulClose(preparedStatement, connection);
		}
	}

	@Override
	public boolean tryLock() {
		try {
			lock();
			return true;
		} catch (Exception e) {
			return false;
		}
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) {
		final Future<?> future = threadPoolExecutor.submit(() -> {
			try {
				lock();
				return 1;
			} catch (Exception e) {
				throw new RuntimeException(e);
			}
		});
		try {
			if (future.get(time, unit) == null) {
				future.cancel(true);
				return false;
			}
			return true;
		} catch (Exception e) {
			return false;
		}
	}

	@Override
	public void unlock() {
		watchDog.stop();
		delete();
	}

	public void gracefulClose(AutoCloseable... closeables) {
		Arrays.asList(closeables).forEach(closeable -> {
			if (closeable != null) {
				try {
					closeable.close();
				} catch (Exception e) {
				}
			}
		});
	}

}

看门狗机制

package cn.forlan.lock.mysql;

import java.util.concurrent.TimeUnit;

public class WatchDog {
	private volatile boolean isStop;
	private long watchTime;
	private Runnable target;
	private Thread thread;

	public WatchDog(Runnable target, long watchTime, TimeUnit timeUnit) {
		this.watchTime = timeUnit.toMillis(watchTime);
		this.target = target;
		this.thread = new Thread(() -> {
			while (!isStop) {
				try {
					Thread.sleep(this.watchTime);
				} catch (InterruptedException e) {
				}
				this.target.run();
			}
		}, "lock-watch-dog");
	}

	public void start() {
		thread.start();
	}

	public void stop() {
		this.isStop = true;
	}
}

2、单元测试

import cn.forlan.lock.mysql.MySQLForUpdateDistributeLock;
import cn.forlan.lock.mysql.MySQLIDDistributeLock;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import javax.sql.DataSource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author weichenglin
 * @since 2023-12-21-上午 11:09:12
 */
@SpringBootTest
public class MySQLLockTest {

	private ExecutorService executorService = Executors.newCachedThreadPool();

	@Autowired
	private DataSource dataSource;

	@Test
	public void testMySQLFUDistributeLock() throws Exception {
		int[] count = {0};
		for (int i = 0; i < 100; i++) {
			executorService.submit(() -> {
				final MySQLForUpdateDistributeLock lock = new MySQLForUpdateDistributeLock(dataSource, "lock_key");
				boolean lcokFlag = false;
				try {
					lock.lock();
					lcokFlag = true;
					count[0]++;
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					if (lcokFlag) {
						lock.unlock();
					}
				}
			});
		}
		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.HOURS);
		System.out.println(count[0]);
	}

	@Test
	public void testMySQLIDDistributeLock() throws Exception {
		int[] count = {0};
		for (int i = 0; i < 100; i++) {
			executorService.submit(() -> {
				final MySQLIDDistributeLock lock = new MySQLIDDistributeLock(dataSource, 3000, "lock_key");
				boolean lcokFlag = false;
				try {
					lock.lock();
					lcokFlag = true;
					count[0]++;
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					if (lcokFlag) {
						lock.unlock();
					}
				}
			});
		}

		executorService.shutdown();
		executorService.awaitTermination(1, TimeUnit.HOURS);
		System.out.println(count[0]);
	}
}

3、SpringBoot应用

@Autowired
private DataSource dataSource;

public String seckill(String skuId){
	AbstractLock lock = new MySQLForUpdateDistributeLock(dataSource, key);
	// AbstractLock lock = new MySQLIDDistributeLock(dataSource, 30000, key);
	Boolean lcokFlag = false;
	try {
		lock.lock();
		lcokFlag = true;
		// 业务代码...
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		if (lcokFlag) {
			lock.unlock();
		}
	}
}

六、秒杀案例

配置RedisTemplate,我们通过秒杀场景来验证分布式锁,库存放到Redis中

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class CacheConfig {
	@Bean
	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
		RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
		redisTemplate.setConnectionFactory(redisConnectionFactory);

		ObjectMapper objectMapper = new ObjectMapper();
		objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);//忽略不能识别字段
		objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);//包含类信息
		objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);//null、空值属性不序列化
		GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper);

		// 设置value的序列化规则和 key的序列化规则
		redisTemplate.setValueSerializer(serializer);// 考虑rest需要在redis存session,服务端需要在redis取session,所以与rest消费端的序列化保持一致
		redisTemplate.setKeySerializer(new StringRedisSerializer());
		redisTemplate.setHashKeySerializer(new StringRedisSerializer());
		redisTemplate.setHashValueSerializer(serializer);
		redisTemplate.afterPropertiesSet();
		return redisTemplate;
	}
}

库存以及秒杀代码,使用的话,把ForlanxxxLock换为对应的锁即可

import cn.forlan.lock.base.AbstractLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;

@RestController
public class SeckillController {

	@Autowired
	private RedisTemplate redisTemplate;

	private static final String SKU_STOCK_KEY = "sku:stock:%s";

    // 设置库存
	@GetMapping("/seckill/{skuId}/{stock}")
	public String seckillSkuIdStock(@PathVariable("skuId") String skuId, @PathVariable("stock") Integer stock) {
		String key = String.format(SKU_STOCK_KEY, skuId);
		redisTemplate.opsForValue().set(key, stock, 8, TimeUnit.HOURS);
		return "设置成功";
	}

    // 秒杀代码
	@GetMapping("/seckill/{skuId}")
	public String seckill(@PathVariable("skuId") String skuId) throws Exception {
		// 获取商品库存
		String key = String.format(SKU_STOCK_KEY, skuId);


		// zk分布式锁
		AbstractLock lock = new ForlanxxxLock("lockKey or lockPath");

		Boolean lcokFlag = false;
		try {
			lock.lock();
			lcokFlag = true;
			Object stock = redisTemplate.opsForValue().get(key);
			if (stock == null || (Integer) stock <= 0) {
				throw new Exception("商品已售罄");
			}
			// 扣减库存
			redisTemplate.opsForValue().decrement(key);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (lcokFlag) {
				lock.unlock();
			}
		}

		return "秒杀成功";
	}
}

七、问题总结

1、 java.lang.IllegalMonitorStateException: You do not own the lock: /lockPath

原因:客户端没连接上或配置错误了,我这里是赋值错了,没赋值
在这里插入图片描述

2、单元测试类中,通过依赖注入,获取不到DataSource对象?

在这里插入图片描述
在这里插入图片描述
解决:添加 @RunWith(SpringRunner.class) 注解,指定使用Spring的测试运行器,在@SpringBootTest注解上指定正确的启动类

3、Etcd连接报错

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V
	at io.grpc.Metadata$Key.validateName(Metadata.java:742)
	at io.grpc.Metadata$Key.<init>(Metadata.java:750)
	at io.grpc.Metadata$Key.<init>(Metadata.java:668)
	at io.grpc.Metadata$AsciiKey.<init>(Metadata.java:959)
	at io.grpc.Metadata$AsciiKey.<init>(Metadata.java:954)
	at io.grpc.Metadata$Key.of(Metadata.java:705)
	at io.grpc.Metadata$Key.of(Metadata.java:701)
	at io.etcd.jetcd.ClientConnectionManager.<clinit>(ClientConnectionManager.java:69)
	at io.etcd.jetcd.ClientImpl.<init>(ClientImpl.java:49)
	at io.etcd.jetcd.ClientBuilder.build(ClientBuilder.java:387)
	at EtcdLockTest.main(EtcdLockTest.java:31)

guava版本冲突了,我们把我们之前curator使用的低版本排除掉,就正常了
在这里插入图片描述

文章来源:https://blog.csdn.net/qq_36433289/article/details/135114734
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。