这个三主要是对前面的代码进行一些补充说明。
首先补充二里面的一些重要方法的说明,便于理解。
//未来数据定时刷新
@Scheduled(cron = "0 */1 * * * ?")//一分钟调用一次
public void refresh() {
//setnx实现分布式锁
String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
if(StringUtils.isNotBlank(token)){
log.info("未来数据定时刷新");
System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
// 获取所有未来数据集合的key值
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
for (String futureKey : futureKeys) { // future_250_250
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
//参数:0:为从0开始查 0~当前时间的毫秒值
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()) {
//将这些任务数据添加到消费者队列中
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}
}
????????scan是基于游标搜索的方法,?使用keys的模糊匹配却发现redis的CPU使用率极高,redis是单线程,会被堵塞。scan是一个基于游标的迭代器,命令每次被调用之后, 都会向返回一个新的游标, 在下次迭代时需要使用这个新游标作为scan命令的游标参数, 以此来延续之前的迭代过程。就是累加的。
这个具体的方法在文章redis(1)里有具体的代码实现。
connection那些方法主要作用是获取一个Redis连接(RedisConnection
)对象。
这个方法是同步操作,但是是将一堆命令同时传递过去处理。具体地个人里接在代码注释里,仍然是一个阻塞方法,我也想过采用异步调用listenfuture去实现,不过觉得好像没什么必要。
//同步操作,允许一次性发送多个命令给Redis服务器,doInRedis方法中执行了一系列Redis命令,包括rPush和zRem,
// 这些命令会一起被打包发送到Redis服务器,而不是在每个命令之间等待返回。
//减少网络延迟,但它仍然是同步的
public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){
List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
//将任务转成一个字符串数组
String[] strings = values.toArray(new String[values.size()]);
//把当前的任务放到connection中
stringRedisConnection.rPush(topic_key,strings);
//把当前的任务从zset中删除
stringRedisConnection.zRem(future_key,strings);
return null;
}
});
return objects;
}
代码在之前防止redis缓存三件套的时候采用过redisson分布式锁方式实现过,这次采用setnx来实现一下。
个人理解,setnx其实也是一种乐观锁的感觉,他会给你生成一个不存在的key,然后看服务实例是否持有该锁进而允不允许他执行,也是比悲观锁效率更高。
?一些具体的注释也在代码里给出了,锁设置成功了就会返回token。
//基于setnx实现分布式锁
/**
* 加锁
*
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) {
//expire过期时间为毫秒值
name = name + "_lock";
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
//参考redis命令:
//set key value [EX seconds] [PX milliseconds] [NX|XX]
//NX:表示key不存在才设置成功。 EX:设置过期时间
Boolean result = conn.set(
name.getBytes(),//key的名称
token.getBytes(),//值为uuid
Expiration.from(expire, TimeUnit.MILLISECONDS),//过期时间
RedisStringCommands.SetOption.SET_IF_ABSENT //NX
);
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory,false);
}
return null;
}
?@PostConstruct//初始化方法,微服务启动了,就会做同步操作
值得说明的是spring注解@Scheduled,存在这一些问题 :
做集群任务的重复执行问题
cron表达式定义在代码之中,修改不方便
定时任务失败了,无法重试也没有统计
如果任务量过大,不能有效的分片执行
这里综合实际任务并且任务量,选择这个注解实现。
所有的task在数据库中都能找到,同步数据前记得清理一下缓存数据,防止重复,但要执行的肯定不会丢失。
//数据库任务定时同步到redis中,每五分钟执行一次
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct//初始化方法,微服务启动了,就会做同步操作
public void reloadData() {
//清理缓存中的数据 list,zset
clearCache();
log.info("数据库数据同步到缓存");
//获取五分钟之后的时间,毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
if(allTasks != null && allTasks.size() > 0){
for (Taskinfo taskinfo : allTasks) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo,task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
log.info("数据库任务同步到redis");
}