定义抽象类AbstractLock实现Lock接口,所有分布式锁实现必须继承AbstractLock,去写具体的实现逻辑,方便后续替换锁。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public abstract class AbstractLock implements Lock {
@Override
public void lock() {
throw new RuntimeException("该操作不支持");
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new RuntimeException("该操作不支持");
}
@Override
public boolean tryLock() {
throw new RuntimeException("该操作不支持");
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new RuntimeException("该操作不支持");
}
@Override
public void unlock() {
throw new RuntimeException("该操作不支持");
}
@Override
public Condition newCondition() {
throw new RuntimeException("该操作不支持");
}
}
导入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.2</version>
</dependency>
import cn.forlan.lock.base.AbstractLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
public class RedisLock extends AbstractLock {
private RedissonClient redissonClient;
private String key;
public RedisLock(RedissonClient redissonClient, String key) {
this.redissonClient = redissonClient;
this.key = key;
}
@Override
public void lock() {
redissonClient.getLock(key).lock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
redissonClient.getLock(key).lockInterruptibly();
}
@Override
public boolean tryLock() {
return redissonClient.getLock(key).tryLock();
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return redissonClient.getLock(key).tryLock(time, unit);
}
@Override
public void unlock() {
redissonClient.getLock(key).unlock();
}
@Override
public Condition newCondition() {
return redissonClient.getLock(key).newCondition();
}
}
需要锁定多个Redis才算成功,一般是超过一半锁定成功才算成功。
import cn.forlan.lock.base.AbstractLock;
import org.redisson.RedissonRedLock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
public class RedisRedLock extends AbstractLock {
private RedissonRedLock redLock;
public RedisRedLock() {
}
public RedisRedLock(RedissonRedLock redLock) {
this.redLock = redLock;
}
@Override
public void lock() {
redLock.lock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
redLock.lockInterruptibly();
}
@Override
public boolean tryLock() {
return redLock.tryLock();
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return redLock.tryLock(time, unit);
}
@Override
public void unlock() {
redLock.unlock();
}
@Override
public Condition newCondition() {
return redLock.newCondition();
}
}
import cn.forlan.lock.redis.RedisLock;
import cn.forlan.lock.redis.RedisRedLock;
import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class RedisLockTest {
private RedissonClient getRedissonClient() {
Config config = new Config();
config.useSingleServer()
.setTimeout(1000000)
.setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
private ExecutorService executorService = Executors.newCachedThreadPool();
@Test
public void SingleLockTest() throws InterruptedException {
int[] count = {0};
RedissonClient client = getRedissonClient();
RedisLock redisLock = new RedisLock(client, "lock_key");
for (int i = 0; i < 1000; i++) {
executorService.submit(() -> {
try {
redisLock.lock();
count[0]++;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
redisLock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
System.out.println(count[0]);
}
public static RLock getRLock(String url, String key) {
Config config = new Config();
config.useSingleServer().setAddress(url);
RedissonClient redissonClient = Redisson.create(config);
return redissonClient.getLock(key);
}
@Test
public void RedLockTest() throws InterruptedException {
int[] count = {0};
RedissonRedLock redissonRedLock = new RedissonRedLock(
getRLock("redis://192.168.56.100:6379", "lock_key"),
getRLock("redis://127.0.0.1:6379", "lock_key"),
getRLock("redis://192.168.56.101:6379", "lock_key")
);
RedisRedLock redLock = new RedisRedLock(redissonRedLock);
for (int i = 0; i < 1000; i++) {
executorService.submit(() -> {
try {
redLock.lock();
count[0]++;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
redLock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
System.out.println(count[0]);
}
}
配置application.properties,单锁的话,配置一个即可
spring.redis.database=2
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.host1=127.0.0.1
spring.redis.password1=
spring.redis.port1=6379
spring.redis.host2=192.168.56.100
spring.redis.password2=
spring.redis.port2=6379
spring.redis.host3=192.168.56.101
spring.redis.password3=
spring.redis.port3=6379
定义单锁配置类,方便我们依赖注入使用
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RedissonClientConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
@Primary
public RedissonClient getRedisson() {
Config config = new Config();
if (StringUtils.isBlank(password)) {
config.useSingleServer().setAddress("redis://" + host + ":" + port);
} else {
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
}
return Redisson.create(config);
}
}
定义红锁配置类,方便我们依赖注入使用
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonRedLockConfig {
@Value("${spring.redis.host1}")
private String host1;
@Value("${spring.redis.port1}")
private String port1;
@Value("${spring.redis.password1}")
private String password1;
@Value("${spring.redis.host2}")
private String host2;
@Value("${spring.redis.port2}")
private String port2;
@Value("${spring.redis.password2}")
private String password2;
@Value("${spring.redis.host3}")
private String host3;
@Value("${spring.redis.port3}")
private String port3;
@Value("${spring.redis.password3}")
private String password3;
@Bean
public RedissonClient redisson1() {
Config config = new Config();
if (StringUtils.isBlank(password1)) {
config.useSingleServer().setAddress("redis://" + host1 + ":" + port1);
} else {
config.useSingleServer().setAddress("redis://" + host1 + ":" + port1).setPassword(password1);
}
return Redisson.create(config);
}
@Bean
public RedissonClient redisson2() {
Config config = new Config();
if (StringUtils.isBlank(password2)) {
config.useSingleServer().setAddress("redis://" + host2 + ":" + port2);
} else {
config.useSingleServer().setAddress("redis://" + host2 + ":" + port2).setPassword(password2);
}
return Redisson.create(config);
}
@Bean
public RedissonClient redisson3() {
Config config = new Config();
if (StringUtils.isBlank(password3)) {
config.useSingleServer().setAddress("redis://" + host3 + ":" + port3);
} else {
config.useSingleServer().setAddress("redis://" + host3 + ":" + port3).setPassword(password3);
}
return Redisson.create(config);
}
}
单锁使用案例
@Autowired
private RedissonClient redissonClient;
public String seckill(String skuId){
AbstractLock lock = new RedisLock(redissonClient, key);
Boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
// 业务代码...
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
}
红锁使用案例
@Resource(name = "redisson1")
private RedissonClient redissonClient1;
@Resource(name = "redisson2")
private RedissonClient redissonClient2;
@Resource(name = "redisson3")
private RedissonClient redissonClient3;
public String seckill(String skuId){
RedissonRedLock redissonRedLock = new RedissonRedLock(redissonClient1.getLock(key), redissonClient2.getLock(key));
AbstractLock lock = new RedisRedLock(redissonRedLock);
Boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
// 业务代码...
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
}
导入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.0</version>
</dependency>
import cn.forlan.lock.base.AbstractLock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
public class ZKLock extends AbstractLock {
private InterProcessLock lock;
public ZKLock(String zkAddress, String lockPath) {
CuratorFramework client = CuratorFrameworkFactory.newClient(
zkAddress,
new RetryNTimes(5, 5000)
);
client.start();
if (client.getState() != CuratorFrameworkState.STARTED) {
throw new RuntimeException("客户端启动失败。。。");
}
this.lock = defaultLock(client, lockPath);
}
public ZKLock(CuratorFramework client, String lockPath) {
this.lock = defaultLock(client, lockPath);
}
private InterProcessLock defaultLock(CuratorFramework client, String lockPath) {
return new InterProcessMutex(client, lockPath);
}
@Override
public void lock() {
try {
lock.acquire();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean tryLock() {
boolean flag = false;
try {
flag = lock.acquire(0, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
return flag;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
boolean flag = false;
try {
flag = lock.acquire(time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
}
return flag;
}
@Override
public void unlock() {
try {
lock.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
import cn.forlan.lock.zookeeper.ZKLock;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author weichenglin
* @since 2023-12-19-下午 04:48:05
*/
@Slf4j
public class ZKLockTest {
private ExecutorService executorService = Executors.newCachedThreadPool();
@Test
public void testLock() throws Exception {
ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
int[] num = {0};
long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
try {
zkLock.lock();
num[0]++;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (zkLock != null) {
zkLock.unlock();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
log.info("耗时:{}", System.currentTimeMillis() - start);
System.out.println(num[0]);
}
@Test
public void testNoLock() throws Exception {
long start = System.currentTimeMillis();
int[] num = {0};
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
num[0]++;
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
log.info("耗时:{}", System.currentTimeMillis() - start);
System.out.println(num[0]);
}
/**
* 锁的可重入测试
*/
@Test
public void testRe() {
ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
Thread thread1 = new Thread(() -> {
zkLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "第一次入锁");
Thread.sleep(1000);
zkLock.lock();
System.out.println(Thread.currentThread().getName() + "第二次入锁");
} catch (Exception e) {
} finally {
zkLock.unlock();
zkLock.unlock();
}
}, "thread1");
thread1.start();
Thread thread2 = new Thread(() -> {
zkLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "第一次入锁");
} catch (Exception e) {
} finally {
zkLock.unlock();
}
}, "thread2");
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testTrySuccess() {
ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
boolean b = zkLock.tryLock();
try {
if (b) {
log.info("{}获取锁成功", Thread.currentThread().getName());
} else {
log.info("{}获取锁失败", Thread.currentThread().getName());
}
} finally {
zkLock.unlock();
}
}
@Test
public void testTryFalse() {
ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
Thread thread1 = new Thread(() -> {
zkLock.lock();
try {
log.info("{}获得锁", Thread.currentThread().getName());
Thread.sleep(10000);
} catch (Exception e) {
} finally {
zkLock.unlock();
log.info("{}释放锁", Thread.currentThread().getName());
}
}, "thread1");
thread1.start();
try {
Thread.sleep(2000);
} catch (Exception e) {
}
Thread thread2 = new Thread(() -> {
log.info("{}尝试获得锁", Thread.currentThread().getName());
boolean b = zkLock.tryLock();
if (b) {
try {
log.info("{}获取锁成功", Thread.currentThread().getName());
} catch (Exception e) {
} finally {
zkLock.unlock();
}
} else {
log.info("{}获取锁失败", Thread.currentThread().getName());
}
}, "thread2");
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testTryTimeOut() {
ZKLock zkLock = new ZKLock("127.0.0.1:2181", "/lockPath");
Thread thread1 = new Thread(() -> {
zkLock.lock();
try {
log.info("{}获得锁", Thread.currentThread().getName());
Thread.sleep(10000);
} catch (Exception e) {
} finally {
zkLock.unlock();
log.info("{}释放锁", Thread.currentThread().getName());
}
}, "thread1");
thread1.start();
try {
Thread.sleep(2000);
} catch (Exception e) {
}
Thread thread2 = new Thread(() -> {
log.info("{}尝试获得锁", Thread.currentThread().getName());
boolean b = false;
try {
b = zkLock.tryLock(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (b) {
try {
log.info("{}获取锁成功", Thread.currentThread().getName());
} catch (Exception e) {
} finally {
zkLock.unlock();
}
} else {
log.info("{}获取锁失败", Thread.currentThread().getName());
}
}, "thread2");
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
配置application.properties
zookeeper.connectionString=127.0.0.1:2181
定义Zookeeper配置类,方便我们依赖注入使用
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfig {
@Value("${zookeeper.connectionString}")
private String connectionString;
@Bean(initMethod = "start", destroyMethod = "close")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(connectionString, new RetryNTimes(5, 5000));
}
}
使用案例
@Autowired
private CuratorFramework client;
public String seckill(String skuId){
AbstractLock lock = new ZKLock(client, "/lockPath");
Boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
// 业务代码...
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
}
Etcd下载地址:https://github.com/etcd-io/etcd/releases/
导入依赖
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.5.0</version>
</dependency>
自定义锁对象信息
package cn.forlan.lock.etcd;
import lombok.Data;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@Data
public class LockData {
private String lockKey;
private boolean lockSuccess;
private long leaseId;
private ScheduledExecutorService service;
private Thread owningThread;
private String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
public LockData() {
}
public LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
继承AbstractLock重新可以实现的方法
package cn.forlan.lock.etcd;
import cn.forlan.lock.base.AbstractLock;
import com.google.common.collect.Maps;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lock.LockResponse;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class EtcdDistributeLock extends AbstractLock {
private Client client;
private Lock lockClient;
private Lease leaseClient;
private String lockKey;
private String lockPath;
/**
* 锁的次数
*/
private AtomicInteger lockCount;
/**
* 租约有效期,防止客户端崩溃,可在租约到期后自动释放锁;另一方面,正常执行过程中,会自动进行续租,单位 ns
*/
private Long leaseTTL;
/**
* 续约锁租期的定时任务,初次启动延迟,单位默认为 s,默认为1s,可根据业务定制设置
*/
private Long initialDelay = 0L;
/**
* 定时任务线程池类
*/
ScheduledExecutorService service = null;
/**
* 保存线程与锁对象的映射,锁对象包含重入次数,重入次数的最大限制为Int的最大值
*/
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
public EtcdDistributeLock() {
}
public EtcdDistributeLock(Client client, String lockKey, long leaseTTL, TimeUnit unit) {
this.client = client;
lockClient = client.getLockClient();
leaseClient = client.getLeaseClient();
this.lockKey = lockKey;
// 转纳秒
this.leaseTTL = unit.toNanos(leaseTTL);
service = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void lock() {
// 检查重入性
Thread currentThread = Thread.currentThread();
LockData oldLockData = threadData.get(currentThread);
if (oldLockData != null && oldLockData.isLockSuccess()) {
// re-entering
int lockCount = oldLockData.lockCount.incrementAndGet();
if (lockCount < 0) {
throw new Error("超出可重入次数限制");
}
return;
}
// 记录租约 ID
Long leaseId = 0L;
try {
leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
// 续租心跳周期
long period = leaseTTL - leaseTTL / 5;
// 启动定时任务续约
service.scheduleAtFixedRate(new EtcdDistributeLock.KeepAliveRunnable(leaseClient, leaseId),
initialDelay, period, TimeUnit.NANOSECONDS);
LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
if (lockResponse != null) {
lockPath = lockResponse.getKey().toString(Charset.forName("utf-8"));
log.info("获取锁成功,锁路径:{},线程:{}", lockPath, currentThread.getName());
}
} catch (InterruptedException | ExecutionException e) {
log.error("获取锁失败", e);
return;
}
// 获取锁成功,锁对象设置
LockData newLockData = new LockData(currentThread, lockKey);
newLockData.setLeaseId(leaseId);
newLockData.setService(service);
threadData.put(currentThread, newLockData);
newLockData.setLockSuccess(true);
}
@Override
public void unlock() {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + lockKey);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + lockKey);
}
try {
// 释放锁
if (lockPath != null) {
lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
}
if (lockData != null) {
// 关闭定时任务
lockData.getService().shutdown();
// 删除租约
if (lockData.getLeaseId() != 0L) {
leaseClient.revoke(lockData.getLeaseId());
}
}
} catch (InterruptedException | ExecutionException e) {
log.error("解锁失败", e);
} finally {
// 移除当前线程资源
threadData.remove(currentThread);
}
}
/**
* 心跳续约线程类
*/
public static class KeepAliveRunnable implements Runnable {
private Lease leaseClient;
private long leaseId;
public KeepAliveRunnable(Lease leaseClient, long leaseId) {
this.leaseClient = leaseClient;
this.leaseId = leaseId;
}
@Override
public void run() {
// 对该leaseid进行一次续约
leaseClient.keepAliveOnce(leaseId);
}
}
}
import cn.forlan.lock.etcd.EtcdDistributeLock;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.*;
public class EtcdLockTest {
private Client client;
private String key = "/etcd/lock";
private static final String server = "http://127.0.0.1:2379";
private ExecutorService executorService = Executors.newFixedThreadPool(10000);
@Before
public void before() throws Exception {
initEtcdClient();
}
private void initEtcdClient() {
client = Client.builder().endpoints(server).build();
}
/**
* 测试etcd是否正常
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
Client client = Client.builder().endpoints("http://127.0.0.1:2379").build();
KV kvClient = client.getKVClient();
ByteSequence key = ByteSequence.from("test_key".getBytes());
ByteSequence value = ByteSequence.from("test_value".getBytes());
// put the key-value
kvClient.put(key, value).get();
// get the CompletableFuture
CompletableFuture<GetResponse> getFuture = kvClient.get(key);
// get the value from CompletableFuture
GetResponse response = getFuture.get();
// delete the key
kvClient.delete(key).get();
}
@Test
public void testEtcdDistributeLock() throws InterruptedException {
int[] count = {0};
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
final EtcdDistributeLock lock = new EtcdDistributeLock(client, key, 20, TimeUnit.SECONDS);
try {
lock.lock();
count[0]++;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
System.err.println("执行结果: " + count[0]);
}
@After
public void after() {
client.close();
}
}
配置application.properties
etcd.url=http://127.0.0.1:2379
定义Etcd配置类,方便我们依赖注入使用
@Configuration
public class EtcdConfig {
@Value("${etcd.url}")
private String etcdUrl;
@Bean
public Client getClient(){
return Client.builder().endpoints(etcdUrl).build();
}
}
具体使用如下:
@Autowired
private Client client;
public String seckill(String skuId){
AbstractLock lock = new EtcdDistributeLock(client, key, 20, TimeUnit.SECONDS);
Boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
// 业务代码...
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
}
导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
</dependency>
application.properties
#datasource
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
创建表
CREATE TABLE `fu_distribute_lock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`lock_name` varchar(100) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lock_name` (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
核心代码:可重入基于行锁,不支持行锁的话,这个无效
import cn.forlan.lock.base.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@Slf4j
public class MySQLForUpdateDistributeLock extends AbstractLock {
public static final String SELECT_SQL = "select * from fu_distribute_lock where `lock_name`=? for update";
public static final String INSERT_SQL = "insert into fu_distribute_lock(lock_name) values(?)";
private final DataSource dataSource;
private ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
private Connection connection;
private String lockName;
public MySQLForUpdateDistributeLock(DataSource dataSource, String lockName) {
this.lockName = lockName;
this.dataSource = dataSource;
}
@Override
public void lock() {
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
while (true) {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
statement = connection.prepareStatement(SELECT_SQL);
statement.setString(1, lockName);
resultSet = statement.executeQuery();
if (resultSet.next()) {
return;
}
this.gracefulClose(resultSet, statement, connection);
log.info("锁记录不存在,正在创建");
Connection insertConnection = dataSource.getConnection();
PreparedStatement insertStatement = null;
try {
insertStatement = insertConnection.prepareStatement(INSERT_SQL);
insertStatement.setString(1, lockName);
if (insertStatement.executeUpdate() == 1) {
log.info("创建锁记录成功");
}
} catch (Exception e) {
log.error("创建锁记录异常", e);
} finally {
this.gracefulClose(insertStatement, insertConnection);
}
}
} catch (Exception e) {
log.error("获取锁异常", e);
} finally {
this.gracefulClose(resultSet, statement);
}
}
@Override
public boolean tryLock() {
try {
lock();
return true;
} catch (Exception e) {
return false;
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) {
final Future<?> future = threadPoolExecutor.submit(() -> {
try {
lock();
return 1;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
final Object o = future.get(time, unit);
if (o == null) {
future.cancel(true);
return false;
}
return true;
} catch (Exception e) {
return false;
}
}
@Override
public void unlock() {
try {
connection.commit();
connection.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
public void gracefulClose(AutoCloseable... closeables) {
Arrays.asList(closeables).forEach(closeable -> {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
}
}
});
}
}
不可重入,目前引擎最好用myisam,innodb 由于gap和意向锁会造成死锁,rollback 性能下降,且机器down掉 造成死锁
创建表
CREATE TABLE `id_distribute_lock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`lock_name` varchar(100) NOT NULL,
`expire_time` bigint(20) NOT NULL,
`thread_id` varchar(100) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lock_name` (`lock_name`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
核心代码实现
package cn.forlan.lock.mysql;
import cn.forlan.lock.base.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@Slf4j
public class MySQLIDDistributeLock extends AbstractLock {
private static final String SELECT_SQL_FORMAT = "select * from id_distribute_lock where lock_name=?";
private static final String UPDATE_SQL_FORMAT = "update id_distribute_lock set expire_time=? where lock_name=?";
private static final String INSERT_SQL_FORMAT = "insert into id_distribute_lock(lock_name,expire_time,thread_id) values(?,?,?)";
private static final String DELETE_SQL_FORMAT = "delete from id_distribute_lock where lock_name=?";
private final DataSource dataSource;
private ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
private int expireTime;
private long reletTime;
private WatchDog watchDog;
private String threadId;
private String lockName;
public MySQLIDDistributeLock(DataSource dataSource, int expireTime, String lockName) {
this.dataSource = dataSource;
this.expireTime = expireTime;
this.reletTime = this.expireTime / 2;
this.lockName = lockName;
threadId = Thread.currentThread().getId() + "-" + UUID.randomUUID().toString();
watchDog = new WatchDog(() -> reletAndCheck(), reletTime, TimeUnit.MILLISECONDS);
}
@Override
public void lock() {
watchDog.start();
while (true) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(INSERT_SQL_FORMAT)) {
statement.setString(1, this.lockName);
statement.setLong(2, System.currentTimeMillis() + expireTime);
statement.setString(3, this.threadId);
try {
//获取锁成功
if (statement.executeUpdate() > 0) {
log.info("获取锁成功");
break;
}
} catch (SQLIntegrityConstraintViolationException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
// 睡眠100ms后继续获取锁,锁力度较大可修改这个时间
Thread.sleep(100);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void reletAndCheck() {
ResultSet resultSet = null;
PreparedStatement preparedStatement = null;
Connection connection = null;
try {
log.info("开始查询锁");
connection = dataSource.getConnection();
preparedStatement = connection.prepareStatement(SELECT_SQL_FORMAT);
preparedStatement.setString(1, this.lockName);
resultSet = preparedStatement.executeQuery();
if (resultSet.next()) {
final String threadId = resultSet.getString("thread_id");
final long expireTime = resultSet.getLong("expire_time");
log.debug("thread_id:{}", threadId);
if (this.threadId.equals(threadId)) {
// 续租
log.info("续租中");
try (PreparedStatement updatePreparedStatement = connection.prepareStatement(UPDATE_SQL_FORMAT)) {
updatePreparedStatement.setLong(1, expireTime + reletTime);
updatePreparedStatement.setString(2, this.lockName);
updatePreparedStatement.executeUpdate();
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 检查时间是否过期
if (System.currentTimeMillis() > expireTime) {
log.info("锁:{}过期删除中", lockName);
delete();
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
this.gracefulClose(resultSet, preparedStatement, connection);
}
}
private void delete() {
log.info("删除锁");
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = this.dataSource.getConnection();
preparedStatement = connection.prepareStatement(DELETE_SQL_FORMAT);
preparedStatement.setString(1, this.lockName);
if (preparedStatement.executeUpdate() == 1) {
log.info("刪除成功");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
this.gracefulClose(preparedStatement, connection);
}
}
@Override
public boolean tryLock() {
try {
lock();
return true;
} catch (Exception e) {
return false;
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) {
final Future<?> future = threadPoolExecutor.submit(() -> {
try {
lock();
return 1;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
if (future.get(time, unit) == null) {
future.cancel(true);
return false;
}
return true;
} catch (Exception e) {
return false;
}
}
@Override
public void unlock() {
watchDog.stop();
delete();
}
public void gracefulClose(AutoCloseable... closeables) {
Arrays.asList(closeables).forEach(closeable -> {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
}
}
});
}
}
看门狗机制
package cn.forlan.lock.mysql;
import java.util.concurrent.TimeUnit;
public class WatchDog {
private volatile boolean isStop;
private long watchTime;
private Runnable target;
private Thread thread;
public WatchDog(Runnable target, long watchTime, TimeUnit timeUnit) {
this.watchTime = timeUnit.toMillis(watchTime);
this.target = target;
this.thread = new Thread(() -> {
while (!isStop) {
try {
Thread.sleep(this.watchTime);
} catch (InterruptedException e) {
}
this.target.run();
}
}, "lock-watch-dog");
}
public void start() {
thread.start();
}
public void stop() {
this.isStop = true;
}
}
import cn.forlan.lock.mysql.MySQLForUpdateDistributeLock;
import cn.forlan.lock.mysql.MySQLIDDistributeLock;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import javax.sql.DataSource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author weichenglin
* @since 2023-12-21-上午 11:09:12
*/
@SpringBootTest
public class MySQLLockTest {
private ExecutorService executorService = Executors.newCachedThreadPool();
@Autowired
private DataSource dataSource;
@Test
public void testMySQLFUDistributeLock() throws Exception {
int[] count = {0};
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
final MySQLForUpdateDistributeLock lock = new MySQLForUpdateDistributeLock(dataSource, "lock_key");
boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
count[0]++;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
System.out.println(count[0]);
}
@Test
public void testMySQLIDDistributeLock() throws Exception {
int[] count = {0};
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
final MySQLIDDistributeLock lock = new MySQLIDDistributeLock(dataSource, 3000, "lock_key");
boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
count[0]++;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
System.out.println(count[0]);
}
}
@Autowired
private DataSource dataSource;
public String seckill(String skuId){
AbstractLock lock = new MySQLForUpdateDistributeLock(dataSource, key);
// AbstractLock lock = new MySQLIDDistributeLock(dataSource, 30000, key);
Boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
// 业务代码...
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
}
配置RedisTemplate,我们通过秒杀场景来验证分布式锁,库存放到Redis中
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class CacheConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);//忽略不能识别字段
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);//包含类信息
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);//null、空值属性不序列化
GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setValueSerializer(serializer);// 考虑rest需要在redis存session,服务端需要在redis取session,所以与rest消费端的序列化保持一致
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(serializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
库存以及秒杀代码,使用的话,把ForlanxxxLock换为对应的锁即可
import cn.forlan.lock.base.AbstractLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@RestController
public class SeckillController {
@Autowired
private RedisTemplate redisTemplate;
private static final String SKU_STOCK_KEY = "sku:stock:%s";
// 设置库存
@GetMapping("/seckill/{skuId}/{stock}")
public String seckillSkuIdStock(@PathVariable("skuId") String skuId, @PathVariable("stock") Integer stock) {
String key = String.format(SKU_STOCK_KEY, skuId);
redisTemplate.opsForValue().set(key, stock, 8, TimeUnit.HOURS);
return "设置成功";
}
// 秒杀代码
@GetMapping("/seckill/{skuId}")
public String seckill(@PathVariable("skuId") String skuId) throws Exception {
// 获取商品库存
String key = String.format(SKU_STOCK_KEY, skuId);
// zk分布式锁
AbstractLock lock = new ForlanxxxLock("lockKey or lockPath");
Boolean lcokFlag = false;
try {
lock.lock();
lcokFlag = true;
Object stock = redisTemplate.opsForValue().get(key);
if (stock == null || (Integer) stock <= 0) {
throw new Exception("商品已售罄");
}
// 扣减库存
redisTemplate.opsForValue().decrement(key);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lcokFlag) {
lock.unlock();
}
}
return "秒杀成功";
}
}
原因:客户端没连接上或配置错误了,我这里是赋值错了,没赋值
解决:添加 @RunWith(SpringRunner.class) 注解,指定使用Spring的测试运行器,在@SpringBootTest注解上指定正确的启动类
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V
at io.grpc.Metadata$Key.validateName(Metadata.java:742)
at io.grpc.Metadata$Key.<init>(Metadata.java:750)
at io.grpc.Metadata$Key.<init>(Metadata.java:668)
at io.grpc.Metadata$AsciiKey.<init>(Metadata.java:959)
at io.grpc.Metadata$AsciiKey.<init>(Metadata.java:954)
at io.grpc.Metadata$Key.of(Metadata.java:705)
at io.grpc.Metadata$Key.of(Metadata.java:701)
at io.etcd.jetcd.ClientConnectionManager.<clinit>(ClientConnectionManager.java:69)
at io.etcd.jetcd.ClientImpl.<init>(ClientImpl.java:49)
at io.etcd.jetcd.ClientBuilder.build(ClientBuilder.java:387)
at EtcdLockTest.main(EtcdLockTest.java:31)
guava版本冲突了,我们把我们之前curator使用的低版本排除掉,就正常了