作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
我们在不久前介绍了SpringBoot定时任务,最近又一起探究了如何使用Redis实现简单的消息队列,都是一些不错的小知识点。为了能跟前面的内容产生联动,这次我们打算把Redis分布式锁相关的介绍融合进定时任务的案例中,学起来更带劲~
在我看来,同样需要使用锁,动机可能完全相反:
前者适用于秒杀等场景。作为商家,当然希望在不发生线程安全问题的前提下,让每一个订单都生效,直到商品售罄。此时分布式锁的写法可以是“不断重试”或“阻塞等待”,即:递归或while true循环尝试获取、阻塞等待。
而后者适用于分布式系统或多节点项目的定时任务,比如同一份代码部署在A、B两台服务器上,而数据库共用同一个。如果不做限制,那么在同一时刻,两台服务器都会去拉取列表执行,会发生任务重复执行的情况。
此时可以考虑使用分布式锁,在cron触发的时刻只允许一个线程去往数据库拉取任务:
在实现Redis分布式锁控制定时任务唯一性的同时,我们引入之前的Redis消息队列。注意,这与Redis分布式锁本身无关,就是顺便复习一遍Redis消息队列而已,大家可以只实现Redis分布式锁+定时任务的部分。
整个Demo的结构大致如图:
当然,实际项目中一般是这样的:
首先,要和大家说一下,但凡牵涉到分布式的处理,没有一个是简单的,上面的Demo设计也不过是玩具,用来启发 大家的思路。
为什么要把Demo设计得这么复杂呢?哈哈,因为这是我在上一家公司自己设计的,遇到了很多坑...拿出来自嘲一番,与各位共勉。
我当时的设计思路是:
由于小公司没有用什么Elastic-Job啥的,就是很普通的多节点部署。为了避免任务重复执行,我想设计一个分布式锁。但因为当时根本不知道Redisson,所以就自己百度了Redis实现分布式锁的方式,然后依葫芦画瓢自己手写了一个 。
但我写完Redis分布式锁后,在实际测试过程中发现还需要考虑锁的失效时间...
这里有两个问题:
最简单的实现方案是这样的,一般没问题:
但极端的情况下(项目在任务进行时重启或意外宕机),可能当前任务来不及解锁就挂了(死锁),那么下一个任务就会一直被锁在方法外等待。就好比厕所里有人被熏晕了,没法开门,而外面的人又进不去...
此时需要装一个自动解锁的门,到时间自动开门,也就是要给锁设置一个过期时间。但紧接着又会有第二个问题:锁的失效时间设多长合适?
很难定。
因为随着项目的发展,定时任务的执行时间很可能是变化的。
如果设置时间过长,极端点,定为365天。假设任务正常执行,比如10分钟就结束,那么线程继续往下就会执行unLock()主动解锁。但万一和上面一样宕机了,那么这个锁就要等365天后才解开。注意,宕机可不像JVM异常,它压根不会去执行finally里的unLock()。这种情况好比有个人在厕所里上大号直接掉坑里了,而自动门默认365天打开…所以,锁过期时间设置过长的坏处,本质是一旦发生宕机来不及解锁,那么过期时间越长,影响面越广,会导致其他操作阻滞。
如果设置时间过短,上一个人还没拉完,门就“咔嚓”一声开了,尴尬不,重复执行了。
终上所述,我当时之所以设计得这么复杂,就是想尽量缩短任务执行的时间,让它尽可能短(拉取后直接丢给队列,自己不处理),这样锁的时间一般设置30分钟就没啥问题。另外,对于死锁问题,我当时没有考虑宕机的情况,只考虑了意外重启…问题还有很多,文末会再总结。
请大家阅读下面代码时思考两个问题:
新建一个空的SpringBoot项目。
拷贝下方代码,构建工程:
构建完以后,拷贝一份,修改端口号为8081,避免和原先的冲突
/**
* 统一管理Redis Key
*
* @author mx
*/
public final class RedisKeyConst {
/**
* 分布式锁的KEY
*/
public static final String RESUME_PULL_TASK_LOCK = "resume_pull_task_lock";
/**
* 简历异步解析任务队列
*/
public static final String RESUME_PARSE_TASK_QUEUE = "resume_parse_task_queue";
}
/**
* 消费者,异步获取简历解析结果并存入数据库
*
* @author mx
*/
@Slf4j
@Component
public class RedisMessageQueueConsumer implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private RedisService redisService;
@Autowired
private AsyncResumeParser asyncResumeParser;
@Autowired
private ObjectMapper objectMapper;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
log.info("开始监听RedisMessageQueue...");
CompletableFuture.runAsync(() -> {
// 大循环,不断监听队列任务(阻塞式)
while (true) {
// 阻塞监听
ResumeCollectionDTO resumeCollectionDTO = (ResumeCollectionDTO) redisService.popQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, 5, TimeUnit.SECONDS);
if (resumeCollectionDTO != null) {
int rePullCount = 0;
int retryCount = 0;
log.info("从队列中取出:{}", resumeCollectionDTO.getName());
log.info(">>>>>>>>>>>>>>>>>>>开始拉取简历:{}", resumeCollectionDTO.getName());
Long asyncPredictId = resumeCollectionDTO.getAsyncPredictId();
// 小循环,针对每一个任务多次调用第三方接口,直到获取最终结果或丢弃任务
while (true) {
try {
PredictResult result = asyncResumeParser.getResult(asyncPredictId);
rePullCount++;
// 如果已经解析完毕
if (result.getStatus() == 2) {
// 保存数据库
try {
log.info("简历:{}解析成功", resumeCollectionDTO.getName());
log.info("resultJson:{}", result.getResultJson());
ResumeCollectionDO resumeCollectionDO = objectMapper.readValue(result.getResultJson(), ResumeCollectionDO.class);
log.info("<<<<<<<<<<<<<<<<<<<保存简历:{}到数据库", resumeCollectionDO);
// 归零
rePullCount = 0;
retryCount = 0;
break;
} catch (Exception e) {
discardTask(resumeCollectionDTO);
log.info("<<<<<<<<<<<<<<<<<<<保存简历失败,丢弃任务");
rePullCount = 0;
retryCount = 0;
break;
}
}
// 远程服务还未解析完毕,重试
else {
try {
if (rePullCount <= 3) {
// 前3次重试,时间为1s间隔
TimeUnit.SECONDS.sleep(1);
log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿1s后进行", resumeCollectionDTO.getName(), rePullCount);
} else if (rePullCount > 3 && rePullCount <= 6) {
// 说明任务比较耗时,加长等待时间
TimeUnit.SECONDS.sleep(2);
log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿2s后进行", resumeCollectionDTO.getName(), rePullCount);
} else if (rePullCount > 6 && rePullCount <= 8) {
// 说明任务比较耗时,加长等待时间
TimeUnit.SECONDS.sleep(3);
log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿3s后进行", resumeCollectionDTO.getName(), rePullCount);
} else {
discardTask(resumeCollectionDTO);
log.info("<<<<<<<<<<<<<<<<<<<多次拉取仍未得到结果, 丢弃简历:{}", resumeCollectionDTO.getName());
retryCount = 0;
rePullCount = 0;
break;
}
} catch (InterruptedException e) {
discardTask(resumeCollectionDTO);
log.info("<<<<<<<<<<<<<<<<<<<任务中断异常, 简历:{}", resumeCollectionDTO.getName());
rePullCount = 0;
retryCount = 0;
break;
}
}
} catch (Exception e) {
if (retryCount > 3) {
discardTask(resumeCollectionDTO);
log.info("<<<<<<<<<<<<<<<<<<<简历:{}重试{}次后放弃, rePullCount:{}, retryCount:{}", resumeCollectionDTO.getName(), retryCount, rePullCount, retryCount);
rePullCount = 0;
retryCount = 0;
break;
}
retryCount++;
log.info("简历:{}远程调用异常, 准备进行第{}次重试...", resumeCollectionDTO.getName(), retryCount);
}
}
log.info("break......");
}
}
});
}
private void discardTask(ResumeCollectionDTO task) {
// 根据asyncPredictId删除任务...
log.info("丢弃任务:{}...", task.getName());
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResumeCollectionDO {
/**
* 简历id
*/
private Long id;
/**
* 简历名称
*/
private String name;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResumeCollectionDTO implements Serializable {
/**
* 简历id
*/
private Long id;
/**
* 异步解析id,稍后根据id可获取最终解析结果
*/
private Long asyncPredictId;
/**
* 简历名称
*/
private String name;
}
public interface RedisService {
/**
* 向队列插入消息
*
* @param queue 自定义队列名称
* @param obj 要存入的消息
*/
void pushQueue(String queue, Object obj);
/**
* 从队列取出消息
*
* @param queue 自定义队列名称
* @param timeout 最长阻塞等待时间
* @param timeUnit 时间单位
* @return
*/
Object popQueue(String queue, long timeout, TimeUnit timeUnit);
/**
* 尝试上锁
*
* @param lockKey
* @param value
* @param expireTime
* @param timeUnit
* @return
*/
boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit);
/**
* 根据MACHINE_ID解锁(只能解自己的)
*
* @param lockKey
* @param value
* @return
*/
boolean unLock(String lockKey, String value);
/**
* 释放锁,不管是不是自己的
*
* @param lockKey
* @param value
* @return
*/
boolean releaseLock(String lockKey, String value);
}
@Slf4j
@Component
public class RedisServiceImpl implements RedisService {
@Autowired
private RedisTemplate redisTemplate;
/**
* 向队列插入消息
*
* @param queue 自定义队列名称
* @param obj 要存入的消息
*/
@Override
public void pushQueue(String queue, Object obj) {
redisTemplate.opsForList().leftPush(queue, obj);
}
/**
* 从队列取出消息
*
* @param queue 自定义队列名称
* @param timeout 最长阻塞等待时间
* @param timeUnit 时间单位
* @return
*/
@Override
public Object popQueue(String queue, long timeout, TimeUnit timeUnit) {
return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);
}
/**
* 尝试上锁
*
* @param lockKey
* @param value
* @param expireTime
* @param timeUnit
* @return
*/
@Override
public boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit) {
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, value);
if (Boolean.TRUE.equals(lock)) {
redisTemplate.expire(lockKey, expireTime, timeUnit);
return true;
} else {
return false;
}
}
/**
* 根据MACHINE_ID解锁(只能解自己的)
*
* @param lockKey
* @param value
* @return
*/
@Override
public boolean unLock(String lockKey, String value) {
String machineId = (String) redisTemplate.opsForValue().get(lockKey);
if (StringUtils.isNotEmpty(machineId) && machineId.equals(value)) {
redisTemplate.delete(lockKey);
return true;
}
return false;
}
/**
* 释放锁,不管是不是自己的
*
* @param lockKey
* @param value
* @return
*/
@Override
public boolean releaseLock(String lockKey, String value) {
Boolean delete = redisTemplate.delete(lockKey);
if (Boolean.TRUE.equals(delete)) {
log.info("Spring启动,节点:{}成功释放上次简历汇聚定时任务锁", value);
return true;
}
return false;
}
}
@Slf4j
@Component
@EnableScheduling
public class ResumeCollectionTask implements ApplicationListener<ContextRefreshedEvent> {
/**
* 当这份代码被部署到不同的服务器,启动时为每台机器分配一个唯一的机器ID
*/
private static final String MACHINE_ID = IdUtil.randomUUID();
@Autowired
private RedisService redisService;
@Autowired
private AsyncResumeParser asyncResumeParser;
@Scheduled(cron = "0 */1 * * * ?")
// @Scheduled(fixedDelay = 60 * 1000L)
public void resumeSchedule() {
// 尝试上锁,返回true或false,锁的过期时间设置为10分钟(实际要根据项目调整,这也是自己实现Redis分布式锁的难点之一)
boolean lock = redisService.tryLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID, 10, TimeUnit.MINUTES);
// 如果当前节点成功获取锁,那么整个系统只允许当前程序去MySQL拉取待执行任务
if (lock) {
log.info("节点:{}获取锁成功,定时任务启动", MACHINE_ID);
try {
collectResume();
} catch (Exception e) {
log.info("定时任务异常:", e);
} finally {
redisService.unLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID);
log.info("节点:{}释放锁,定时任务结束", MACHINE_ID);
}
} else {
log.info("节点:{}获取锁失败,放弃定时任务", MACHINE_ID);
}
}
/**
* 任务主体:
* 1.从数据库拉取符合条件的HR邮箱
* 2.从HR邮箱拉取附件简历
* 3.调用远程服务异步解析简历
* 4.插入待处理任务到数据库,作为记录留存
* 5.把待处理任务的id丢到Redis Message Queue,让Consumer去异步处理
*/
private void collectResume() throws InterruptedException {
// 跳过1、2两步,假设已经拉取到简历
log.info("节点:{}从数据库拉取任务简历", MACHINE_ID);
List<ResumeCollectionDO> resumeCollectionList = new ArrayList<>();
resumeCollectionList.add(new ResumeCollectionDO(1L, "张三的简历.pdf"));
resumeCollectionList.add(new ResumeCollectionDO(2L, "李四的简历.html"));
resumeCollectionList.add(new ResumeCollectionDO(3L, "王五的简历.doc"));
// 模拟数据库查询耗时
TimeUnit.SECONDS.sleep(3);
log.info("提交任务到消息队列:{}", resumeCollectionList.stream().map(ResumeCollectionDO::getName).collect(Collectors.joining(",")));
for (ResumeCollectionDO resumeCollectionDO : resumeCollectionList) {
// 上传简历异步解析,得到异步结果id
Long asyncPredictId = asyncResumeParser.asyncParse(resumeCollectionDO);
// 把任务插入数据库
// 略...
// 把任务丢到Redis Message Queue
ResumeCollectionDTO resumeCollectionDTO = new ResumeCollectionDTO();
BeanUtils.copyProperties(resumeCollectionDO, resumeCollectionDTO);
resumeCollectionDTO.setAsyncPredictId(asyncPredictId);
redisService.pushQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, resumeCollectionDTO);
}
}
/**
* 项目重启后先尝试删除之前的锁(如果存在),防止死锁等待
*
* @param event the event to respond to
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
redisService.releaseLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID);
}
}
/**
* 第三方提供给的简历解析服务
*
* @author mx
*/
@Service
public class AsyncResumeParser {
@Autowired
private ObjectMapper objectMapper;
/**
* 模拟分配异步任务结果id,不用深究,没啥意义,反正每个任务都会得到一个id,稍后根据id返回最终解析结果
*/
private static final AtomicLong ASYNC_RESULT_ID = new AtomicLong(1000);
/**
* 解析结果
*/
private static final Map<Long, String> results = new HashMap<>();
/**
* 模拟第三方服务异步解析,返回解析结果
*
* @param resumeCollectionDO
* @return
*/
public Long asyncParse(ResumeCollectionDO resumeCollectionDO) {
long asyncPredictId = ASYNC_RESULT_ID.getAndIncrement();
try {
String resultJson = objectMapper.writeValueAsString(resumeCollectionDO);
results.put(asyncPredictId, resultJson);
return asyncPredictId;
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return -1L;
}
/**
* 根据异步id返回解析结果,但此时未必已经解析成功
* <p>
* 解析状态
* 0 初始化
* 1 处理中
* 2 调用成功
* 3 调用失败
*
* @param asyncPredictId
* @return
*/
public PredictResult getResult(Long asyncPredictId) throws ParseErrorException, InterruptedException {
// 随机模拟异步解析的状态
int value = ThreadLocalRandom.current().nextInt(100);
if (value >= 85) {
// 模拟解析完成
TimeUnit.SECONDS.sleep(1);
String resultJson = results.get(asyncPredictId);
return new PredictResult(resultJson, 2);
} else if (value <= 5) {
// 模拟解析异常
TimeUnit.SECONDS.sleep(1);
throw new ParseErrorException("简历解析异常");
}
// 如果时间过短,返回status=1,表示解析中
TimeUnit.SECONDS.sleep(1);
return new PredictResult("", 1);
}
}
/**
* 解析异常
*
* @author mx
*/
public class ParseErrorException extends Exception {
/**
* Constructs a new exception with {@code null} as its detail message.
* The cause is not initialized, and may subsequently be initialized by a
* call to {@link #initCause}.
*/
public ParseErrorException() {
}
/**
* Constructs a new exception with the specified detail message. The
* cause is not initialized, and may subsequently be initialized by
* a call to {@link #initCause}.
*
* @param message the detail message. The detail message is saved for
* later retrieval by the {@link #getMessage()} method.
*/
public ParseErrorException(String message) {
super(message);
}
}
/**
* 第三方返回值
*
* @author mx
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PredictResult {
/**
* 解析结果
*/
private String resultJson;
/**
* 解析状态
* 0 初始化
* 1 处理中
* 2 调用成功
* 3 调用失败
*/
private Integer status;
}
在项目运行过程中,启动这个测试类的方法,即可观察不一样的现象。
@SpringBootTest
class RedisDistributedLockApplicationTests {
@Autowired
private RedisService redisService;
/**
* 作为失败案例(因为不存在777L这个解析任务,AsyncResumeParse.results会返回null)
* 观察RedisMessageQueueConsumer的处理方式
*/
@Test
void contextLoads() {
ResumeCollectionDTO resumeCollectionDTO = new ResumeCollectionDTO();
resumeCollectionDTO.setId(666L);
resumeCollectionDTO.setAsyncPredictId(777L);
resumeCollectionDTO.setName("测试1号");
redisService.pushQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, resumeCollectionDTO);
}
}
server:
port: 8080
spring:
redis:
host:
password:
database: 2
啥都不说了,都在jiu代码里了。大家自己拷贝到本地,动手玩一下,加深对Redis锁和Redis消息队列的理解。
?
只有一个定时任务能去数据库拉取任务,到时多节点部署大致是下面这样(redis一般是独立部署的,和节点代码无关):
上面展示的代码其实存在很多问题,我们会在下一篇指出并讨论解决方案。
本文仅提供思路,开阔大家的眼界,千万别在自己项目中使用!!!!我当年被这个坑惨了,花里胡哨的,尤其Consumer里一大堆的sleep(),是非常low的!!
对于异步调用的结果,不要循环等待,而应该分为几步:
分布式定时任务可以考虑xxl-job或elastic-job,分布式锁推荐使用redisson。
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
进群,大家一起学习,一起进步,一起对抗互联网寒冬