本文主要研究一下PowerJob的LockService
tech/powerjob/server/extension/LockService.java
public interface LockService {
/**
* 上锁(获取锁),立即返回,不会阻塞等待锁
* @param name 锁名称
* @param maxLockTime 最长持有锁的时间,单位毫秒(ms)
* @return true -> 获取到锁,false -> 未获取到锁
*/
boolean tryLock(String name, long maxLockTime);
/**
* 释放锁
* @param name 锁名称
*/
void unlock(String name);
}
LockService接口定义了tryLock、unlock方法
tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java
@Slf4j
@Service
public class DatabaseLockService implements LockService {
private final String ownerIp;
private final OmsLockRepository omsLockRepository;
@Autowired
public DatabaseLockService(OmsLockRepository omsLockRepository) {
this.ownerIp = NetUtils.getLocalHost();
this.omsLockRepository = omsLockRepository;
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
int num = omsLockRepository.deleteByOwnerIP(ownerIp);
log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num);
}));
}
@Override
public boolean tryLock(String name, long maxLockTime) {
OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime);
try {
omsLockRepository.saveAndFlush(newLock);
return true;
} catch (DataIntegrityViolationException ignore) {
} catch (Exception e) {
log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
}
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
// 锁超时,强制释放锁并重新尝试获取
if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
unlock(name);
return tryLock(name, maxLockTime);
}
return false;
}
@Override
public void unlock(String name) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
}catch (Exception e) {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
}
DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName
tech/powerjob/common/utils/NetUtils.java
public static String getLocalHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
String addressFromJVM = System.getProperty(PowerJobDKey.BIND_LOCAL_ADDRESS);
if (StringUtils.isNotEmpty(addressFromJVM)) {
log.info("[Net] use address from[{}]: {}", PowerJobDKey.BIND_LOCAL_ADDRESS, addressFromJVM);
return HOST_ADDRESS = addressFromJVM;
}
InetAddress address = getLocalAddress();
if (address != null) {
return HOST_ADDRESS = address.getHostAddress();
}
return LOCALHOST_VALUE;
}
public static InetAddress getLocalAddress() {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
LOCAL_ADDRESS = localAddress;
return localAddress;
}
private static InetAddress getLocalAddress0() {
// @since 2.7.6, choose the {@link NetworkInterface} first
try {
InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface());
if (addressOp != null) {
return addressOp;
}
} catch (Throwable e) {
log.warn("[Net] getLocalAddress0 failed.", e);
}
InetAddress localAddress = null;
try {
localAddress = InetAddress.getLocalHost();
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
return addressOp.get();
}
} catch (Throwable e) {
log.warn("[Net] getLocalAddress0 failed.", e);
}
return localAddress;
}
NetUtils的getLocalHost先判断HOST_ADDRESS是否有值,有则直接返回,否则先从系统属性读取
powerjob.network.local.address
,读取不到则取LOCAL_ADDRESS,若LOCAL_ADDRESS为null则通过getLocalAddress0获取
tech/powerjob/server/persistence/remote/model/OmsLockDO.java
@Data
@Entity
@NoArgsConstructor
@Table(uniqueConstraints = {@UniqueConstraint(name = "uidx01_oms_lock", columnNames = {"lockName"})})
public class OmsLockDO {
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String lockName;
private String ownerIP;
/**
* 最长持有锁的时间
*/
private Long maxLockTime;
private Date gmtCreate;
private Date gmtModified;
public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
this.lockName = lockName;
this.ownerIP = ownerIP;
this.maxLockTime = maxLockTime;
this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate;
}
}
OmsLockDO定义lockName为唯一索引,它还定义了ownerIP、maxLockTime
tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Modifying
@Transactional(rollbackOn = Exception.class)
@Query(value = "delete from OmsLockDO where lockName = ?1")
int deleteByLockName(String lockName);
OmsLockDO findByLockName(String lockName);
@Modifying
@Transactional(rollbackOn = Exception.class)
int deleteByOwnerIP(String ip);
}
OmsLockRepository继承了JpaRepository,它定义了deleteByLockName、findByLockName、deleteByOwnerIP方法
LockService接口定义了tryLock、unlock方法;DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName。