?
按照文档进行项目的导入并准备数据库表导入对应实体类和nacos配置中心
?
成功集成通过测试
①:拷贝mybatis-plus生成的文件,mapper
②:创建task类,用于接收添加任务的参数
package com.heima.model.schedule.dtos;
import lombok.Data;
import java.io.Serializable;
@Data
public class Task implements Serializable {
/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskType;
/**
* 优先级
*/
private Integer priority;
/**
* 执行id
*/
private long executeTime;
/**
* task参数
*/
private byte[] parameters;
}
③:创建TaskService
package com.heima.schedule.service;
import com.heima.model.schedule.dtos.Task;
/**
* 对外访问接口
*/
public interface TaskService {
/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
public long addTask(Task task) ;
}
实现:
import com.alibaba.fastjson.JSON;
import com.heima.common.constants.ScheduleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Calendar;
import java.util.Date;
@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {
/**
* 添加延迟任务
*
* @param task
* @return
*/
@Override
public long addTask(Task task) {
//1.添加任务到数据库中
boolean success = addTaskToDb(task);
if (success) {
//2.添加任务到redis
addTaskToCache(task);
}
return task.getTaskId();
}
@Autowired
private CacheService cacheService;
/**
* 把任务添加到redis中
*
* @param task
*/
private void addTaskToCache(Task task) {
String key = task.getTaskType() + "_" + task.getPriority();
//获取5分钟之后的时间 毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
long nextScheduleTime = calendar.getTimeInMillis();
//2.1 如果任务的执行时间小于等于当前时间,存入list
if (task.getExecuteTime() <= System.currentTimeMillis()) {
cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
} else if (task.getExecuteTime() <= nextScheduleTime) {
//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
}
}
@Autowired
private TaskinfoMapper taskinfoMapper;
@Autowired
private TaskinfoLogsMapper taskinfoLogsMapper;
/**
* 添加任务到数据库中
*
* @param task
* @return
*/
private boolean addTaskToDb(Task task) {
boolean flag = false;
try {
//保存任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
//设置taskID
task.setTaskId(taskinfo.getTaskId());
//保存任务日志数据
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
}
ScheduleConstants常量类
package com.heima.common.constants;
public class ScheduleConstants {
//task状态
public static final int SCHEDULED=0; //初始化状态
public static final int EXECUTED=1; //已执行状态
public static final int CANCELLED=2; //已取消状态
public static String FUTURE="future_"; //未来数据key前缀
public static String TOPIC="topic_"; //当前数据key前缀
}
测试成功
在TaskService中添加方法
/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
public boolean cancelTask(long taskId);
/**
* 取消任务
* @param taskId
* @return
*/
@Override
public boolean cancelTask(long taskId) {
boolean flag = false;
//删除任务,更新日志
Task task = updateDb(taskId,ScheduleConstants.CANCELLED);
//删除redis的数据
if(task != null){
removeTaskFromCache(task);
flag = true;
}
return flag;
}
/**
* 删除redis中的任务数据
* @param task
*/
private void removeTaskFromCache(Task task) {
String key = task.getTaskType()+"_"+task.getPriority();
if(task.getExecuteTime()<=System.currentTimeMillis()){
cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
}else {
cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
}
}
/**
* 删除任务,更新任务日志状态
* @param taskId
* @param status
* @return
*/
private Task updateDb(long taskId, int status) {
Task task = null;
try {
//删除任务
taskinfoMapper.deleteById(taskId);
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
taskinfoLogs.setStatus(status);
taskinfoLogsMapper.updateById(taskinfoLogs);
task = new Task();
BeanUtils.copyProperties(taskinfoLogs,task);
task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
}catch (Exception e){
log.error("task cancel exception taskid={}",taskId);
}
return task;
}
在TaskService中添加方法
/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
public Task poll(int type,int priority);
实现
/**
* 按照类型和优先级拉取任务
* @return
*/
@Override
public Task poll(int type,int priority) {
Task task = null;
try {
String key = type+"_"+priority;
String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
if(StringUtils.isNotBlank(task_json)){
task = JSON.parseObject(task_json, Task.class);
//更新数据库信息
updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
}
}catch (Exception e){
e.printStackTrace();
log.error("poll task exception");
}
return task;
}
@Test
public void testKeys(){
Set<String> keys = cacheService.keys("future_*");
System.out.println(keys);
Set<String> scan = cacheService.scan("future_*");
System.out.println(scan);
}
?测试代码不再看了
在TaskServiceImpl中添加方法
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
// 获取所有未来数据集合的key值
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
for (String futureKey : futureKeys) { // future_250_250
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()) {
//将这些任务数据添加到消费者队列中
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}
在启动类中添加开启任务调度注解:@EnableScheduling
测试成功
在工具类CacheService中添加方法
/**
* 加锁
*
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) {
name = name + "_lock";
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
//参考redis命令:
//set key value [EX seconds] [PX milliseconds] [NX|XX]
Boolean result = conn.set(
name.getBytes(),
token.getBytes(),
Expiration.from(expire, TimeUnit.MILLISECONDS),
RedisStringCommands.SetOption.SET_IF_ABSENT //NX
);
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory,false);
}
return null;
}
修改未来数据定时刷新的方法,如下:
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
if(StringUtils.isNotBlank(token)){
log.info("未来数据定时刷新---定时任务");
//获取所有未来数据的集合key
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : futureKeys) {//future_100_50
//获取当前数据的key topic
String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
//按照key和分值查询符合条件的数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
//同步数据
if(!tasks.isEmpty()){
cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
log.info("成功的将"+futureKey+"刷新到了"+topicKey);
}
}
}
}
?
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
clearCache();
log.info("数据库数据同步到缓存");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
if(allTasks != null && allTasks.size() > 0){
for (Taskinfo taskinfo : allTasks) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo,task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
}
private void clearCache(){
// 删除缓存中未来数据集合和当前消费者队列的所有key
Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
cacheService.delete(futurekeys);
cacheService.delete(topickeys);
}
?提供远程的feign接口,在heima-leadnews-feign-api编写类如下:
@FeignClient(value = "leadnews-schedule")
public interface IScheduleClient {
/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
public ResponseResult addTask(@RequestBody Task task);
/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
}
在heima-leadnews-schedule微服务下提供对应的实现
import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
public class ScheduleClient implements IScheduleClient {
@Autowired
private TaskService taskService;
/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
@Override
public ResponseResult addTask(@RequestBody Task task) {
return ResponseResult.okResult(taskService.addTask(task));
}
/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
@Override
public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
return ResponseResult.okResult(taskService.cancelTask(taskId));
}
/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
@Override
public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
return ResponseResult.okResult(taskService.poll(type,priority));
}
}
创建WmNewsTaskService
public interface WmNewsTaskService {
/**
* 添加任务到延迟队列中
* @param id 文章的id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
public void addNewsToTask(Integer id, Date publishTime);
}
?实现:
@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
@Autowired
private IScheduleClient scheduleClient;
/**
* 添加任务到延迟队列中
* @param id 文章的id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
@Override
@Async
public void addNewsToTask(Integer id, Date publishTime) {
log.info("添加任务到延迟服务中----begin");
Task task = new Task();
task.setExecuteTime(publishTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));
scheduleClient.addTask(task);
log.info("添加任务到延迟服务中----end");
}
}
枚举类
package com.heima.model.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum TaskTypeEnum {
NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
private final int taskType; //对应具体业务
private final int priority; //业务不同级别
private final String desc; //描述信息
}
修改发布文章代码:
把之前的异步调用修改为调用延迟任务
@Autowired
private WmNewsTaskService wmNewsTaskService;
/**
* 发布修改文章或保存为草稿
* @param dto
* @return
*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {
//0.条件判断
if(dto == null || dto.getContent() == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//1.保存或修改文章
WmNews wmNews = new WmNews();
//属性拷贝 属性名词和类型相同才能拷贝
BeanUtils.copyProperties(dto,wmNews);
//封面图片 list---> string
if(dto.getImages() != null && dto.getImages().size() > 0){
//[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
String imageStr = StringUtils.join(dto.getImages(), ",");
wmNews.setImages(imageStr);
}
//如果当前封面类型为自动 -1
if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
wmNews.setType(null);
}
saveOrUpdateWmNews(wmNews);
//2.判断是否为草稿 如果为草稿结束当前方法
if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
//3.不是草稿,保存文章内容图片与素材的关系
//获取到文章内容中的图片信息
List<String> materials = ectractUrlInfo(dto.getContent());
saveRelativeInfoForContent(materials,wmNews.getId());
//4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
saveRelativeInfoForCover(dto,wmNews,materials);
//审核文章
// wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
测试
提交一个之后一直在审核?
任务表里也多了条数据
WmNewsTaskService中添加方法
/**
* 消费延迟队列数据
*/
public void scanNewsByTask();
实现
@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
/**
* 消费延迟队列数据
*/
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {
log.info("文章审核---消费任务执行---begin---");
ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if(responseResult.getCode().equals(200) && responseResult.getData() != null){
String json_str = JSON.toJSONString(responseResult.getData());
Task task = JSON.parseObject(json_str, Task.class);
byte[] parameters = task.getParameters();
WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
System.out.println(wmNews.getId()+"-----------");
wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
}
log.info("文章审核---消费任务执行---end---");
}
在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling