https://www.cnblogs.com/wangyingshuo/p/14510524.html
public interface DistributedLock extends Lock {
/**
* 尝试获取分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识
* @param expireSeconds 锁有效时间
* @param overTime 超时时间
* @param timeUnit 时间单位
* @return 是否获取成功
* @throws InterruptedException 中断异常
*/
boolean tryLock(String lockKey, String requestId, int expireSeconds, int overTime, TimeUnit timeUnit) throws InterruptedException;
/**
* 尝试获取分布式分段锁
* @param lockKey
* @param segments 分段标识
* @param requestId
* @param expireSeconds
* @param overTime
* @param timeUnit
* @return 返回成功加锁的段
* @throws InterruptedException
*/
String tryLock(String lockKey, List<String> segments , String requestId, int expireSeconds, int overTime, TimeUnit timeUnit) throws InterruptedException ;
/**
* 释放分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识
*/
boolean unlock(String lockKey, String requestId);
}
加锁本质是为了实现互斥,只让一个访问资源。redis分布式锁是根据lockKey
这个key去查是否有对应的value
,如果值不存在,说明并没有人访问资源,则加锁成功。如果值存在,说明已经有人在访问资源,则加锁失败。
@Component
public class RedisDistributedLock implements DistributedLock {
/**
* 成功
*/
private final static Long SUCCESS = 1L;
/**等待200ms*/
private final static Long WAIT_TIME = 200L;
/**分段锁间的时间间隔*/
private final static Long SEGMENT_WAIT_TIME = 5L;
private final static Long WAIT_TIME_1 = 50L;
/**
* 缓存
*/
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void lock() {
throw new UnsupportedOperationException();
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new InterruptedException();
}
@Override
public boolean tryLock() {
throw new UnsupportedOperationException();
}
@Override
public boolean tryLock(String lockKey, String requestId, int expireSeconds, int overTime, TimeUnit timeUnit) throws InterruptedException {
long lastTime = System.nanoTime();
// 获取锁的超时时间
long timeout = timeUnit.toNanos(overTime);
for (; ; ) {
boolean isGet = tryGetDistributedLock(lockKey, requestId, expireSeconds, timeUnit);
// 获取成功
if (isGet) {
return true;
}
long now = System.nanoTime();
// 超时
if (timeout <= 0) {
throw new DefinitelyRuntimeException("当前活动火热,稍后再试...");
}
//等待WAIT_TIME
Thread.sleep(WAIT_TIME);
timeout -= now - lastTime;
lastTime = now;
// 线程已被销毁
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}
public String tryLock(String lockKey, List<String> segments , String requestId, int expireSeconds, int overTime, TimeUnit timeUnit) throws InterruptedException {
long lastTime = System.nanoTime();
// 获取锁的超时时间
long timeout = timeUnit.toNanos(overTime);
for (; ; ) {
for (int i = 0; i < segments.size(); i++) {
boolean sucess = tryGetDistributedLock(lockKey + segments.get(i), requestId, expireSeconds, timeUnit);
if (sucess) {
return segments.get(i);
}
long now = System.nanoTime();
// 超时
if (timeout <= 0) {
throw new DefinitelyRuntimeException("加锁超时");
}
//等待WAIT_TIME
Thread.sleep(SEGMENT_WAIT_TIME);
timeout -= now - lastTime;
lastTime = now;
// 线程已被销毁
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
//等待WAIT_TIME
Thread.sleep(WAIT_TIME_1);
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public boolean unlock(String lockKey, String requestId) {
return releaseDistributedLock(lockKey, requestId);
}
@Override
public void unlock() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("NullableProblems")
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
/**
* 尝试获取分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识
* @param expireSeconds 缓存有效时间
* @param timeUnit 时间单位
* @return 是否获取成功
*/
private boolean tryGetDistributedLock(String lockKey, String requestId, int expireSeconds, TimeUnit timeUnit) {
// 设置锁,不存在时才能设置成功,表示被锁住
if (redisTemplate == null) {
return false;
}
Boolean isLock = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireSeconds, timeUnit);
if (isLock != null && isLock) {
return true;
}
return false;
}
/**
* 释放分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
private boolean releaseDistributedLock(String lockKey, String requestId) {
// 用脚本来保证同步操作
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);
return SUCCESS.equals(result);
}
}
/**
* 分布式锁注解
*
*/
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Dl {
/**
* 锁定的缓存key名称,若为空,则为类完整名 + 方法名 + 后缀
*
* @return 锁定的缓存key名称
*/
String value() default "";
/**
* 缓存key后缀参数所在位置,与{@link #spEl()}属性冲突,优先级低于{@link #spEl()}
*
* @return 位置
*/
int suffixKeyIndex() default -1;
/**
* 同步锁定的时间,单位秒,默认60
*
* @return 同步锁定的时间
*/
int lockTimeSeconds() default 60;
/**
* 获取锁的超时时间
*
* @return 获取锁的超时时间
*/
int overTime() default 60;
/**
* 时间单位
*
* @return 时间单位
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
/**
* 缓存key后缀的spEl表达式,与{@link #suffixKeyIndex()}属性冲突,优先级高于{@link #suffixKeyIndex()}
*
* @return 缓存key后缀的spEl表达式
*/
String spEl() default "";
}
当每次使用Dl注解的时候,都会调用下面的distributedLock()
方法,即添加了注解的方法都会加分布式锁。
@Aspect
@Component
@Order(-1)
@ConditionalOnBean(DistributedLock.class)
public class DistributedAspect {
/**
* 分布式锁
*/
@Autowired
private DistributedLock distributedLock;
/**
* @param proceedingJoinPoint proceedingJoinPoint
*/
@Around("@annotation(com.nascent.ecrp.mall.springframework.distribute.Dl)")
public Object distributedLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
String requestId = UUID.randomUUID().toString();
MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
Method method = methodSignature.getMethod();
Dl distributedKey = method.getAnnotation(Dl.class);
String lockKey = distributedKey.value();
if (StringUtils.isBlank(lockKey)) {
// 若默认的key为空,则生成一个
lockKey = method.getDeclaringClass().getName() + ":" + method.getName();
}
// 解析缓存key的后缀
int suffixKeyIndex = distributedKey.suffixKeyIndex();
String suffixKey = "";
String spEl = distributedKey.spEl();
if (StringUtils.isNotBlank(spEl)) {
suffixKey = ":" + SpElUtils
.getValue(spEl, proceedingJoinPoint.getArgs(), methodSignature.getParameterNames());
} else if (suffixKeyIndex > -1) {
Object[] args = proceedingJoinPoint.getArgs();
suffixKey += ":" + args[suffixKeyIndex];
}
lockKey += suffixKey;
lockKey = "distributed_lock:" + lockKey;
try {
if (distributedLock.tryLock(lockKey, requestId, distributedKey.lockTimeSeconds(), distributedKey.overTime(), distributedKey.timeUnit())) {
return proceedingJoinPoint.proceed();
} else {
throw new DefinitelyRuntimeException("系统挤爆了,请稍后再试");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DefinitelyRuntimeException(e);
} finally {
distributedLock.unlock(lockKey, requestId);
}
}
}
@Override
@Dl(value = "submit", spEl = "#customer.openId")
@Transactional
public BeanCommonResult cancelLike(FreeTrialReportLikeMobileRequest request, Customer customer) {
WmActivityFreeTrialLogDO freeTrialLog = WmActivityFreeTrialLogDao.dao().findLogByParticipateId(request.getTrialLogGuid());
if (freeTrialLog == null) {
throw new BaseRuntimeException("参数有误");
}
WmActivityFreeTrialReportLikeLog log = trialReportLikeLogMapper.selectByUserActLog(request.getTrialLogGuid(), customer.getOpenId());
Date now = new Date();
if (log == null) {
throw new BaseRuntimeException("点赞记录不存在");
} else {
if (Constants.Globals.GLOBALS_NO.equals(log.getState())) {
throw new BaseRuntimeException("请勿重复提交");
}
}
log.setState(0);
log.setUpdateTime(now);
trialReportLikeLogMapper.updateByPrimaryKeySelective(log);
//点赞数量 自减
WmActivityFreeTrialLogDao.dao().incrLikeCount(freeTrialLog.getGuid(), -1);
return new BeanCommonResult();
}