环境:SpringBoot2.7.12
本篇文章将会为大家介绍有关spring integration提供的分布式锁功能。
Spring Integration?是一个框架,用于构建事件驱动的应用程序。在 Spring Integration 中,LockRegistry?是一个接口,用于管理分布式锁。分布式锁是一种同步机制,用于确保在分布式系统中的多个节点之间对共享资源的互斥访问。
LockRegistry及相关子接口(如:RenewableLockRegistry)?接口的主要功能:
常见的?LockRegistry?实现包括基于数据库、ZooKeeper 和 Redis 的实现。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/spring_lock?serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&useSSL=false
username: root
password: xxxooo
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
---
spring:
integration:
jdbc:
initialize-schema: always
# 基于数据库需要执行初始化脚本
schema: classpath:schema-mysql.sql
@Bean
public DefaultLockRepository defaultLockRepository(DataSource dataSource) {
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
// 这里根据你的业务需要,配置表前缀,默认:IN_
lockRepository.setPrefix("T_") ;
return lockRepository ;
}
// 注册基于数据库的分布式锁
@Bean
public JdbcLockRegistry jdbcLockRegistry(DefaultLockRepository lockRepository) {
return new JdbcLockRegistry(lockRepository) ;
}
@Test
public void testLock() throws Exception
int len = 10 ;
CountDownLatch cdl = new CountDownLatch(len) ;
CountDownLatch waiter = new CountDownLatch(len) ;
Thread[] ts = new Thread[len] ;
for (int i = 0; i < len; i++) {
ts[i] = new Thread(() -> {
waiter.countDown() ;
System.out.println(Thread.currentThread().getName() + " - 准备获取锁") ;
try {
waiter.await() ;
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// 获取锁
Lock lock = registry.obtain("drug_store_key_001") ;
lock.lock() ;
System.out.println(Thread.currentThread().getName() + " - 获取锁成功") ;
try {
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
// 释放锁
lock.unlock() ;
cdl.countDown() ;
System.out.println(Thread.currentThread().getName() + " - 锁释放成功") ;
}
}, "T - " + i) ;
}
for (int i = 0; i < len; i++) {
ts[i].start() ;
}
cdl.await() ;
}
锁的实现JdbcLock,该对象实现了java.util.concurrent.locks.Lock,所以该锁是支持重入等操作的。
配置锁获取失败后的重试间隔,默认值100ms
JdbcLockRegistry jdbcLockRegistry = new JdbcLockRegistry(lockRepository);
// 定义锁对象时设置当获取锁失败后重试间隔时间。
jdbcLockRegistry.setIdleBetweenTries(Duration.ofMillis(200)) ;
jdbcLockRegistry.renewLock("drug_store_key_001");
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
redis:
host: localhost
port: 6379
password: xxxooo
database: 8
lettuce:
pool:
maxActive: 8
maxIdle: 100
minIdle: 10
maxWait: -1
测试代码与上面基于JDBC的一样,只需要修改调用加锁的代码即可
Lock lock = redisLockRegistry.obtain("001") ;
设置锁的有效期,默认是60s
// 第三个参数设置了key的有效期,这里改成10s
RedisLockRegistry redisLockRegistry = new RedisLockRegistry(connectionFactory, registryKey, 10000) ;
注意:redis key的有效期设置为10s,如果你的业务执行超过了10s,那么程序将会报错。并没有redission watch dog机制。
Exception in thread "T - 0" java.lang.IllegalStateException: Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.
at org.springframework.integration.redis.util.RedisLockRegistry$RedisLock.unlock(RedisLockRegistry.java:450)
at com.pack.SpringIntegrationDemoApplicationTests.lambda$1(SpringIntegrationDemoApplicationTests.java:83)
at java.lang.Thread.run(Thread.java:748)
如果10s过期后key自动删除后,其它线程是否能立马获取到锁呢?如果是单节点中其它现在也不能获取锁,必须等上一个线程结束后才可以,这是因为在内部还维护了一个ReentrantLock锁,在获取分布式锁前要先获取本地的一个锁。
private abstract class RedisLock implements Lock {
private final ReentrantLock localLock = new ReentrantLock();
public final void lock() {
this.localLock.lock();
while (true) {
try {
if (tryRedisLock(-1L)) {
return;
}
} catch (InterruptedException e) {
} catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
}
注意:不管是基于数据库还是Redis都要先获取本地的锁
Spring Cloud Task就使用到了Spring Integration中的锁基于数据库的。
总结:Spring Integration 的分布式锁为开发者提供了一种在分布式系统中实现可靠同步的有效方法。通过合理选择和使用这些锁实现,可以确保对共享资源的访问在多个节点之间保持协调一致,从而提高系统的整体可靠性和性能。
完毕!!!