目录
???????XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
分布式任务调度平台XXL-JOBXXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。https://www.xuxueli.com/xxl-job/????????一般我们都是在XXL-JOB提供的管理页面,进行任务调度的配置。如下图:
????????也有在自己的项目中进行管理任务调度配置的需求。此篇文章将介绍如果通过API配置XXL-JOB的任务。
工具 | 版本 |
XXL-JOB | 2.4.1-SNAPSHOT |
SpringBoot | 2.5.6.RELEASE |
主要实现依然是xxl-job底层的增删改查,需要在controller层去除框架权限校验的功能
package com.xxl.job.admin.controller;
import com.xxl.job.admin.controller.annotation.PermissionLimit;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.service.XxlJobService;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.GsonTool;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
/**
* Created by meng
*/
@Controller
@RequestMapping("/api")
public class JobApiController {
@Resource
private AdminBiz adminBiz;
@Resource
private XxlJobService xxlJobService;
/**
* api
*
* @param uri
* @param data
* @return
*/
@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
// valid
if (!"POST".equalsIgnoreCase(request.getMethod())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (XxlJobAdminConfig.getAdminConfig().getAccessToken() != null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length() > 0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else if ("addJob".equals(uri)) {
XxlJobInfo xxlJobInfo = GsonTool.fromJson(data, XxlJobInfo.class);
return xxlJobService.add(xxlJobInfo);
} else if ("updateJob".equals(uri)) {
XxlJobInfo xxlJobInfo = GsonTool.fromJson(data, XxlJobInfo.class);
return xxlJobService.update(xxlJobInfo);
} else if ("jobInfo".equals(uri)) {
XxlJobInfo xxlJobInfo = GsonTool.fromJson(data, XxlJobInfo.class);
return xxlJobService.info(xxlJobInfo.getId());
} else if ("removeJob".equals(uri)) {
XxlJobInfo xxlJobInfo = GsonTool.fromJson(data, XxlJobInfo.class);
return xxlJobService.remove(xxlJobInfo.getId());
} else if ("startJob".equals(uri)) {
XxlJobInfo xxlJobInfo = GsonTool.fromJson(data, XxlJobInfo.class);
return xxlJobService.start(xxlJobInfo.getId());
} else if ("stopJob".equals(uri)) {
XxlJobInfo xxlJobInfo = GsonTool.fromJson(data, XxlJobInfo.class);
return xxlJobService.stop(xxlJobInfo.getId());
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
}
}
XxlJobService:
package com.xxl.job.admin.service;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.core.biz.model.ReturnT;
import java.util.Date;
import java.util.Map;
/**
* core job action for xxl-job
*
* @author meng
*/
public interface XxlJobService {
/**
* page list
*
* @param start
* @param length
* @param jobGroup
* @param jobDesc
* @param executorHandler
* @param author
* @return
*/
public Map<String, Object> pageList(int start, int length, int jobGroup, int triggerStatus, String jobDesc, String executorHandler, String author);
/**
* add job
*
* @param jobInfo
* @return
*/
public ReturnT<String> add(XxlJobInfo jobInfo);
/**
* update job
*
* @param jobInfo
* @return
*/
public ReturnT<String> update(XxlJobInfo jobInfo);
/**
* remove job
* *
* @param id
* @return
*/
public ReturnT<String> remove(int id);
/**
* start job
*
* @param id
* @return
*/
public ReturnT<String> start(int id);
/**
* stop job
*
* @param id
* @return
*/
public ReturnT<String> stop(int id);
/**
* dashboard info
*
* @return
*/
public Map<String,Object> dashboardInfo();
/**
* chart info
*
* @param startDate
* @param endDate
* @return
*/
public ReturnT<Map<String,Object>> chartInfo(Date startDate, Date endDate);
/**
* info job
*
* @param id
* @return
*/
public ReturnT<String> info(int id);
}
XxlJobServiceImpl:
package com.xxl.job.admin.service.impl;
import com.xxl.job.admin.core.cron.CronExpression;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLogReport;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum;
import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
import com.xxl.job.admin.core.thread.JobScheduleHelper;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.admin.dao.*;
import com.xxl.job.admin.service.XxlJobService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.util.DateUtil;
import com.xxl.job.core.util.GsonTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.MessageFormat;
import java.util.*;
/**
* core job action for xxl-job
*
* @author meng
*/
@Service
public class XxlJobServiceImpl implements XxlJobService {
private static Logger logger = LoggerFactory.getLogger(XxlJobServiceImpl.class);
@Resource
private XxlJobGroupDao xxlJobGroupDao;
@Resource
private XxlJobInfoDao xxlJobInfoDao;
@Resource
public XxlJobLogDao xxlJobLogDao;
@Resource
private XxlJobLogGlueDao xxlJobLogGlueDao;
@Resource
private XxlJobLogReportDao xxlJobLogReportDao;
@Override
public Map<String, Object> pageList(int start, int length, int jobGroup, int triggerStatus, String jobDesc, String executorHandler, String author) {
// page list
List<XxlJobInfo> list = xxlJobInfoDao.pageList(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author);
int list_count = xxlJobInfoDao.pageListCount(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author);
// package result
Map<String, Object> maps = new HashMap<String, Object>();
maps.put("recordsTotal", list_count); // 总记录数
maps.put("recordsFiltered", list_count); // 过滤后的总记录数
maps.put("data", list); // 分页列表
return maps;
}
@Override
public ReturnT<String> add(XxlJobInfo jobInfo) {
// valid base
XxlJobGroup group = xxlJobGroupDao.load(jobInfo.getJobGroup());
if (group == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_choose") + I18nUtil.getString("jobinfo_field_jobgroup")));
}
if (jobInfo.getJobDesc() == null || jobInfo.getJobDesc().trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input") + I18nUtil.getString("jobinfo_field_jobdesc")));
}
if (jobInfo.getAuthor() == null || jobInfo.getAuthor().trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input") + I18nUtil.getString("jobinfo_field_author")));
}
// valid trigger
ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
if (scheduleTypeEnum == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
if (scheduleTypeEnum == ScheduleTypeEnum.CRON) {
if (jobInfo.getScheduleConf() == null || !CronExpression.isValidExpression(jobInfo.getScheduleConf())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Cron" + I18nUtil.getString("system_unvalid"));
}
} else if (scheduleTypeEnum == ScheduleTypeEnum.FIX_RATE/* || scheduleTypeEnum == ScheduleTypeEnum.FIX_DELAY*/) {
if (jobInfo.getScheduleConf() == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")));
}
try {
int fixSecond = Integer.valueOf(jobInfo.getScheduleConf());
if (fixSecond < 1) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
} catch (Exception e) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
}
// valid job
if (GlueTypeEnum.match(jobInfo.getGlueType()) == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_gluetype") + I18nUtil.getString("system_unvalid")));
}
if (GlueTypeEnum.BEAN == GlueTypeEnum.match(jobInfo.getGlueType()) && (jobInfo.getExecutorHandler() == null || jobInfo.getExecutorHandler().trim().length() == 0)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input") + "JobHandler"));
}
// 》fix "\r" in shell
if (GlueTypeEnum.GLUE_SHELL == GlueTypeEnum.match(jobInfo.getGlueType()) && jobInfo.getGlueSource() != null) {
jobInfo.setGlueSource(jobInfo.getGlueSource().replaceAll("\r", ""));
}
// valid advanced
if (ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_executorRouteStrategy") + I18nUtil.getString("system_unvalid")));
}
if (MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), null) == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("misfire_strategy") + I18nUtil.getString("system_unvalid")));
}
if (ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), null) == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_executorBlockStrategy") + I18nUtil.getString("system_unvalid")));
}
// 》ChildJobId valid
if (jobInfo.getChildJobId() != null && jobInfo.getChildJobId().trim().length() > 0) {
String[] childJobIds = jobInfo.getChildJobId().split(",");
for (String childJobIdItem : childJobIds) {
if (childJobIdItem != null && childJobIdItem.trim().length() > 0 && isNumeric(childJobIdItem)) {
XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.parseInt(childJobIdItem));
if (childJobInfo == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE,
MessageFormat.format((I18nUtil.getString("jobinfo_field_childJobId") + "({0})" + I18nUtil.getString("system_not_found")), childJobIdItem));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE,
MessageFormat.format((I18nUtil.getString("jobinfo_field_childJobId") + "({0})" + I18nUtil.getString("system_unvalid")), childJobIdItem));
}
}
// join , avoid "xxx,,"
String temp = "";
for (String item : childJobIds) {
temp += item + ",";
}
temp = temp.substring(0, temp.length() - 1);
jobInfo.setChildJobId(temp);
}
// add in db
jobInfo.setAddTime(new Date());
jobInfo.setUpdateTime(new Date());
jobInfo.setGlueUpdatetime(new Date());
xxlJobInfoDao.save(jobInfo);
if (jobInfo.getId() < 1) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_add") + I18nUtil.getString("system_fail")));
}
return new ReturnT<String>(String.valueOf(jobInfo.getId()));
}
private boolean isNumeric(String str) {
try {
int result = Integer.valueOf(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
@Override
public ReturnT<String> update(XxlJobInfo jobInfo) {
// valid base
if (jobInfo.getJobDesc() == null || jobInfo.getJobDesc().trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input") + I18nUtil.getString("jobinfo_field_jobdesc")));
}
if (jobInfo.getAuthor() == null || jobInfo.getAuthor().trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("system_please_input") + I18nUtil.getString("jobinfo_field_author")));
}
// valid trigger
ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
if (scheduleTypeEnum == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
if (scheduleTypeEnum == ScheduleTypeEnum.CRON) {
if (jobInfo.getScheduleConf() == null || !CronExpression.isValidExpression(jobInfo.getScheduleConf())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Cron" + I18nUtil.getString("system_unvalid"));
}
} else if (scheduleTypeEnum == ScheduleTypeEnum.FIX_RATE /*|| scheduleTypeEnum == ScheduleTypeEnum.FIX_DELAY*/) {
if (jobInfo.getScheduleConf() == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
try {
int fixSecond = Integer.valueOf(jobInfo.getScheduleConf());
if (fixSecond < 1) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
} catch (Exception e) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
}
// valid advanced
if (ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_executorRouteStrategy") + I18nUtil.getString("system_unvalid")));
}
if (MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), null) == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("misfire_strategy") + I18nUtil.getString("system_unvalid")));
}
if (ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), null) == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_executorBlockStrategy") + I18nUtil.getString("system_unvalid")));
}
// 》ChildJobId valid
if (jobInfo.getChildJobId() != null && jobInfo.getChildJobId().trim().length() > 0) {
String[] childJobIds = jobInfo.getChildJobId().split(",");
for (String childJobIdItem : childJobIds) {
if (childJobIdItem != null && childJobIdItem.trim().length() > 0 && isNumeric(childJobIdItem)) {
XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.parseInt(childJobIdItem));
if (childJobInfo == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE,
MessageFormat.format((I18nUtil.getString("jobinfo_field_childJobId") + "({0})" + I18nUtil.getString("system_not_found")), childJobIdItem));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE,
MessageFormat.format((I18nUtil.getString("jobinfo_field_childJobId") + "({0})" + I18nUtil.getString("system_unvalid")), childJobIdItem));
}
}
// join , avoid "xxx,,"
String temp = "";
for (String item : childJobIds) {
temp += item + ",";
}
temp = temp.substring(0, temp.length() - 1);
jobInfo.setChildJobId(temp);
}
// group valid
XxlJobGroup jobGroup = xxlJobGroupDao.load(jobInfo.getJobGroup());
if (jobGroup == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_jobgroup") + I18nUtil.getString("system_unvalid")));
}
// stage job info
XxlJobInfo exists_jobInfo = xxlJobInfoDao.loadById(jobInfo.getId());
if (exists_jobInfo == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_id") + I18nUtil.getString("system_not_found")));
}
// next trigger time (5s后生效,避开预读周期)
long nextTriggerTime = exists_jobInfo.getTriggerNextTime();
boolean scheduleDataNotChanged = jobInfo.getScheduleType().equals(exists_jobInfo.getScheduleType()) && jobInfo.getScheduleConf().equals(exists_jobInfo.getScheduleConf());
if (exists_jobInfo.getTriggerStatus() == 1 && !scheduleDataNotChanged) {
try {
Date nextValidTime = JobScheduleHelper.generateNextValidTime(jobInfo, new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS));
if (nextValidTime == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
nextTriggerTime = nextValidTime.getTime();
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
}
exists_jobInfo.setJobGroup(jobInfo.getJobGroup());
exists_jobInfo.setJobDesc(jobInfo.getJobDesc());
exists_jobInfo.setAuthor(jobInfo.getAuthor());
exists_jobInfo.setAlarmEmail(jobInfo.getAlarmEmail());
exists_jobInfo.setScheduleType(jobInfo.getScheduleType());
exists_jobInfo.setScheduleConf(jobInfo.getScheduleConf());
exists_jobInfo.setMisfireStrategy(jobInfo.getMisfireStrategy());
exists_jobInfo.setExecutorRouteStrategy(jobInfo.getExecutorRouteStrategy());
exists_jobInfo.setExecutorHandler(jobInfo.getExecutorHandler());
exists_jobInfo.setExecutorParam(jobInfo.getExecutorParam());
exists_jobInfo.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
exists_jobInfo.setExecutorTimeout(jobInfo.getExecutorTimeout());
exists_jobInfo.setExecutorFailRetryCount(jobInfo.getExecutorFailRetryCount());
exists_jobInfo.setChildJobId(jobInfo.getChildJobId());
exists_jobInfo.setTriggerNextTime(nextTriggerTime);
exists_jobInfo.setUpdateTime(new Date());
xxlJobInfoDao.update(exists_jobInfo);
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> remove(int id) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
if (xxlJobInfo == null) {
return ReturnT.SUCCESS;
}
xxlJobInfoDao.delete(id);
xxlJobLogDao.delete(id);
xxlJobLogGlueDao.deleteByJobId(id);
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> start(int id) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
// valid
ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(xxlJobInfo.getScheduleType(), ScheduleTypeEnum.NONE);
if (ScheduleTypeEnum.NONE == scheduleTypeEnum) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type_none_limit_start")));
}
// next trigger time (5s后生效,避开预读周期)
long nextTriggerTime = 0;
try {
Date nextValidTime = JobScheduleHelper.generateNextValidTime(xxlJobInfo, new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS));
if (nextValidTime == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
nextTriggerTime = nextValidTime.getTime();
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type") + I18nUtil.getString("system_unvalid")));
}
xxlJobInfo.setTriggerStatus(1);
xxlJobInfo.setTriggerLastTime(0);
xxlJobInfo.setTriggerNextTime(nextTriggerTime);
xxlJobInfo.setUpdateTime(new Date());
xxlJobInfoDao.update(xxlJobInfo);
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> stop(int id) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
xxlJobInfo.setTriggerStatus(0);
xxlJobInfo.setTriggerLastTime(0);
xxlJobInfo.setTriggerNextTime(0);
xxlJobInfo.setUpdateTime(new Date());
xxlJobInfoDao.update(xxlJobInfo);
return ReturnT.SUCCESS;
}
@Override
public Map<String, Object> dashboardInfo() {
int jobInfoCount = xxlJobInfoDao.findAllCount();
int jobLogCount = 0;
int jobLogSuccessCount = 0;
XxlJobLogReport xxlJobLogReport = xxlJobLogReportDao.queryLogReportTotal();
if (xxlJobLogReport != null) {
jobLogCount = xxlJobLogReport.getRunningCount() + xxlJobLogReport.getSucCount() + xxlJobLogReport.getFailCount();
jobLogSuccessCount = xxlJobLogReport.getSucCount();
}
// executor count
Set<String> executorAddressSet = new HashSet<String>();
List<XxlJobGroup> groupList = xxlJobGroupDao.findAll();
if (groupList != null && !groupList.isEmpty()) {
for (XxlJobGroup group : groupList) {
if (group.getRegistryList() != null && !group.getRegistryList().isEmpty()) {
executorAddressSet.addAll(group.getRegistryList());
}
}
}
int executorCount = executorAddressSet.size();
Map<String, Object> dashboardMap = new HashMap<String, Object>();
dashboardMap.put("jobInfoCount", jobInfoCount);
dashboardMap.put("jobLogCount", jobLogCount);
dashboardMap.put("jobLogSuccessCount", jobLogSuccessCount);
dashboardMap.put("executorCount", executorCount);
return dashboardMap;
}
@Override
public ReturnT<Map<String, Object>> chartInfo(Date startDate, Date endDate) {
// process
List<String> triggerDayList = new ArrayList<String>();
List<Integer> triggerDayCountRunningList = new ArrayList<Integer>();
List<Integer> triggerDayCountSucList = new ArrayList<Integer>();
List<Integer> triggerDayCountFailList = new ArrayList<Integer>();
int triggerCountRunningTotal = 0;
int triggerCountSucTotal = 0;
int triggerCountFailTotal = 0;
List<XxlJobLogReport> logReportList = xxlJobLogReportDao.queryLogReport(startDate, endDate);
if (logReportList != null && logReportList.size() > 0) {
for (XxlJobLogReport item : logReportList) {
String day = DateUtil.formatDate(item.getTriggerDay());
int triggerDayCountRunning = item.getRunningCount();
int triggerDayCountSuc = item.getSucCount();
int triggerDayCountFail = item.getFailCount();
triggerDayList.add(day);
triggerDayCountRunningList.add(triggerDayCountRunning);
triggerDayCountSucList.add(triggerDayCountSuc);
triggerDayCountFailList.add(triggerDayCountFail);
triggerCountRunningTotal += triggerDayCountRunning;
triggerCountSucTotal += triggerDayCountSuc;
triggerCountFailTotal += triggerDayCountFail;
}
} else {
for (int i = -6; i <= 0; i++) {
triggerDayList.add(DateUtil.formatDate(DateUtil.addDays(new Date(), i)));
triggerDayCountRunningList.add(0);
triggerDayCountSucList.add(0);
triggerDayCountFailList.add(0);
}
}
Map<String, Object> result = new HashMap<String, Object>();
result.put("triggerDayList", triggerDayList);
result.put("triggerDayCountRunningList", triggerDayCountRunningList);
result.put("triggerDayCountSucList", triggerDayCountSucList);
result.put("triggerDayCountFailList", triggerDayCountFailList);
result.put("triggerCountRunningTotal", triggerCountRunningTotal);
result.put("triggerCountSucTotal", triggerCountSucTotal);
result.put("triggerCountFailTotal", triggerCountFailTotal);
return new ReturnT<Map<String, Object>>(result);
}
@Override
public ReturnT<String> info(int id) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
return new ReturnT<String>(GsonTool.toJson(xxlJobInfo));
}
}
GsonTool:
import com.google.gson.*;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Date;
import java.util.List;
/**
* @author meng
*/
public class GsonTool {
private static Gson gson = null;
static {
GsonBuilder builder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
builder.registerTypeAdapter(Date.class, new JsonDeserializer<Date>() {
public Date deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
return new Date(json.getAsJsonPrimitive().getAsLong());
}
});
gson = builder.create();
}
/**
* Object 转成 json
*
* @param src
* @return String
*/
public static String toJson(Object src) {
return gson.toJson(src);
}
/**
* json 转成 特定的cls的Object
*
* @param json
* @param classOfT
* @return
*/
public static <T> T fromJson(String json, Class<T> classOfT) {
return gson.fromJson(json, classOfT);
}
/**
* json 转成 特定的 rawClass<classOfT> 的Object
*
* @param json
* @param classOfT
* @param argClassOfT
* @return
*/
public static <T> T fromJson(String json, Class<T> classOfT, Class argClassOfT) {
Type type = new ParameterizedType4ReturnT(classOfT, new Class[]{argClassOfT});
return gson.fromJson(json, type);
}
public static class ParameterizedType4ReturnT implements ParameterizedType {
private final Class raw;
private final Type[] args;
public ParameterizedType4ReturnT(Class raw, Type[] args) {
this.raw = raw;
this.args = args != null ? args : new Type[0];
}
@Override
public Type[] getActualTypeArguments() {
return args;
}
@Override
public Type getRawType() {
return raw;
}
@Override
public Type getOwnerType() {
return null;
}
}
/**
* json 转成 特定的cls的list
*
* @param json
* @param classOfT
* @return
*/
public static <T> List<T> fromJsonList(String json, Class<T> classOfT) {
return gson.fromJson(
json,
new TypeToken<List<T>>() {
}.getType()
);
}
}
<!-- hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.22</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
job类:
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* job参数
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class JobVo {
/**
* job描述
*/
private String jobDesc;
/**
* job调度配置
*/
private String scheduleConf;
/**
* 设备ID
*/
private String equid;
/**
* wvp-设备ID
*/
private String deviceId;
/**
* wvp-通道ID
*/
private String channelId;
}
XxlJobInfo类:?
package com.yunhoutech.aiel.backend.model;
import lombok.Data;
import java.util.Date;
/**
* xxl-job info
*
* @author xuxueli 2016-1-12 18:25:49
*/
@Data
public class XxlJobInfo {
private int id; // 主键ID
private int jobGroup; // 执行器主键ID
private String jobDesc;
private Date addTime;
private Date updateTime;
private String author; // 负责人
private String alarmEmail; // 报警邮件
private String scheduleType; // 调度类型
private String scheduleConf; // 调度配置,值含义取决于调度类型
private String misfireStrategy; // 调度过期策略
private String executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器,任务Handler名称
private String executorParam; // 执行器,任务参数
private String executorBlockStrategy; // 阻塞处理策略
private int executorTimeout; // 任务执行超时时间,单位秒
private int executorFailRetryCount; // 失败重试次数
private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
private String glueSource; // GLUE源代码
private String glueRemark; // GLUE备注
private Date glueUpdatetime; // GLUE更新时间
private String childJobId; // 子任务ID,多个逗号分隔
private int triggerStatus; // 调度状态:0-停止,1-运行
private long triggerLastTime; // 上次调度时间
private long triggerNextTime; // 下次调度时间
}
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.demo.backend.model.JobVo;
import com.demo.backend.model.XxlJobInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
/**
* @Author: meng
* @Description:
* @Date: 2023/7/10 11:00
* @Version: 1.0
*/
@Component
public class XxlJobUtil {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final String CODE = "code";
public static final String XXL_JOB_ACCESS_TOKEN = "XXL-JOB-ACCESS-TOKEN";
private static final String ADD_URL = "/api/addJob";
private static final String INFO_URL = "/api/jobInfo";
private static final String UPDATE_URL = "/api/updateJob";
private static final String REMOVE_URL = "/api/removeJob";
private static final String PAUSE_URL = "/api/pauseJob";
private static final String START_URL = "/api/startJob";
private static final String STOP_URL = "/api/stopJob";
private static final String ADD_START_URL = "/api/addAndStart";
private static final String GET_GROUP_ID = "/api/getGroupId";
/**
* 新增job
* {\"jobGroup\":\"1\",\"jobDesc\":\"test_des\",\"author\":\"admin\",\"alarmEmail\":\"\",\"scheduleType\":\"CRON\",\"scheduleConf\":\"0 0 0 * * ? *\",
* \"cronGen_display\":\"0 0 0 * * ? *\",\"schedule_conf_CRON\":\"0 0 0 * * ? *\",\"schedule_conf_FIX_RATE\":\"\",\"schedule_conf_FIX_DELAY\":\"\",
* \"glueType\":\"BEAN\",\"executorHandler\":\"demoJobHandler\",\"executorParam\":\"\",\"executorRouteStrategy\":\"FIRST\",\"childJobId\":\"\",
* \"misfireStrategy\":\"DO_NOTHING\",\"executorBlockStrategy\":\"SERIAL_EXECUTION\",\"executorTimeout\":\"0\",\"executorFailRetryCount\":\"1\",
* \"glueRemark\":\"GLUE%E4%BB%A3%E7%A0%81%E5%88%9D%E5%A7%8B%E5%8C%96\",\"glueSource\":\"\"}
*/
public Integer addJob(JobVo jobVo) {
if (StrUtil.hasBlank(jobVo.getJobDesc(), jobVo.getScheduleConf(), jobVo.getEquid(), jobVo.getDeviceId(), jobVo.getChannelId())) {
logger.info("jobVo-参数都不能为空");
return null;
}
JSONObject requestInfo = new JSONObject();
requestInfo.put("jobGroup", 3);
requestInfo.put("jobDesc", jobVo.getJobDesc());
requestInfo.put("author", "admin");
requestInfo.put("alarmEmail", "");
requestInfo.put("scheduleType", "CRON");
requestInfo.put("scheduleConf", jobVo.getScheduleConf());
requestInfo.put("cronGen_display", jobVo.getScheduleConf());
requestInfo.put("schedule_conf_CRON", jobVo.getScheduleConf());
requestInfo.put("schedule_conf_FIX_RATE", "");
requestInfo.put("schedule_conf_FIX_DELAY", "");
requestInfo.put("glueType", "BEAN");
requestInfo.put("executorHandler", "pollPresetJobHandler");
requestInfo.put("executorParam", String.format("%s,%s,%s", jobVo.getEquid(), jobVo.getDeviceId(), jobVo.getChannelId()));
// 一致性hash
requestInfo.put("executorRouteStrategy", "CONSISTENT_HASH");
requestInfo.put("childJobId", "");
requestInfo.put("misfireStrategy", "DO_NOTHING");
requestInfo.put("executorBlockStrategy", "SERIAL_EXECUTION");
requestInfo.put("executorTimeout", "0");
requestInfo.put("executorFailRetryCount", "1");
requestInfo.put("glueRemark", "GLUE代码初始化");
requestInfo.put("glueSource", "");
String body = getRequestBody(requestInfo.toJSONString(), ADD_URL);
if (StrUtil.isNotBlank(body)) {
JSONObject jsonObject = JSON.parseObject(body);
if (jsonObject.getInteger(CODE) != 200) {
logger.info("addJob error:{}", JSON.parseObject(body));
return null;
}
logger.info("addJob success");
return jsonObject.getInteger("content");
}
return null;
}
/**
* job详情
*/
public XxlJobInfo infoJob(int id) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(id);
String body = getRequestBody(JSONObject.toJSONString(xxlJobInfo), INFO_URL);
if (StrUtil.isNotBlank(body)) {
JSONObject jsonObject = JSON.parseObject(body);
if (jsonObject.getInteger(CODE) != 200) {
logger.info("infoJob error:{}", JSON.parseObject(body));
return null;
}
logger.info("infoJob success");
return JSONObject.parseObject(jsonObject.getString("content"), XxlJobInfo.class);
}
return null;
}
/**
* 更新job
*/
public boolean updateJob(XxlJobInfo xxlJobInfo) {
if (ObjectUtil.isNull(xxlJobInfo)) {
logger.info("updateJob xxlJobInfo is null");
return false;
}
String body = getRequestBody(JSONObject.toJSONString(xxlJobInfo), UPDATE_URL);
if (StrUtil.isNotBlank(body)) {
JSONObject jsonObject = JSON.parseObject(body);
if (jsonObject.getInteger(CODE) != 200) {
logger.info("updateJob error:{}", JSON.parseObject(body));
return false;
}
}
logger.info("updateJob success");
return true;
}
/**
* 更新job
*/
public boolean updateJob(String jobId, String scheduleConf) {
if (StrUtil.hasBlank(jobId, scheduleConf)) {
logger.info("jobVo-参数都不能为空");
return false;
}
XxlJobInfo xxlJobInfo = infoJob(Integer.valueOf(jobId));
if (ObjectUtil.isNull(xxlJobInfo)) {
logger.info("updateJob xxlJobInfo is null");
return false;
}
xxlJobInfo.setScheduleConf(scheduleConf);
String body = getRequestBody(JSONObject.toJSONString(xxlJobInfo), UPDATE_URL);
if (StrUtil.isNotBlank(body)) {
JSONObject jsonObject = JSON.parseObject(body);
if (jsonObject.getInteger(CODE) != 200) {
logger.info("updateJob error:{}", JSON.parseObject(body));
return false;
}
}
logger.info("updateJob success");
return true;
}
/**
* 启动job
*/
public boolean startJob(int id) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(id);
String body = getRequestBody(JSONObject.toJSONString(xxlJobInfo), START_URL);
if (StrUtil.isNotBlank(body)) {
JSONObject jsonObject = JSON.parseObject(body);
if (jsonObject.getInteger(CODE) != 200) {
logger.info("startJob error:{}", JSON.parseObject(body));
return false;
}
}
logger.info("startJob success");
return true;
}
/**
* 停止job
*/
public boolean stopJob(int id) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(id);
String body = getRequestBody(JSONObject.toJSONString(xxlJobInfo), STOP_URL);
if (StrUtil.isNotBlank(body)) {
JSONObject jsonObject = JSON.parseObject(body);
if (jsonObject.getInteger(CODE) != 200) {
logger.info("stopJob error:{}", JSON.parseObject(body));
return false;
}
}
logger.info("stopJob success");
return true;
}
/**
* 删除job
*/
public boolean removeJob(int id) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(id);
// 先停止job,再删除job
if (!stopJob(id)) {
return false;
}
String body = getRequestBody(JSONObject.toJSONString(xxlJobInfo), REMOVE_URL);
if (StrUtil.isNotBlank(body)) {
JSONObject jsonObject = JSON.parseObject(body);
if (jsonObject.getInteger(CODE) != 200) {
logger.info("removeJob error:{}", JSON.parseObject(body));
return false;
}
}
logger.info("removeJob success");
return true;
}
/**
* 接口请求
* XXL_JOB_URL:请求链接
* XXL_JOB_TOKEN:权限token
*/
private String getRequestBody(String dateStr, String uri) {
HttpResponse httpResponse = HttpRequest.post(XXL_JOB_URL)
.body(dateStr).header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.header(XXL_JOB_ACCESS_TOKEN, XXL_JOB_TOKEN).execute();
return httpResponse.body();
}
}