本文主要研究一下PowerJob的LightTaskTracker
tech/powerjob/worker/core/tracker/task/TaskTracker.java
@Slf4j
public abstract class TaskTracker {
/**
* TaskTracker创建时间
*/
protected final long createTime;
/**
* 任务实例ID,使用频率过高,从 InstanceInfo 提取出来单独保存一份
*/
protected final long instanceId;
/**
* 任务实例信息
*/
protected final InstanceInfo instanceInfo;
/**
* 追加的工作流上下文数据
*
* @since 2021/02/05
*/
protected final Map<String, String> appendedWfContext;
/**
* worker 运行时元数据
*/
protected final WorkerRuntime workerRuntime;
/**
* 是否结束
*/
protected final AtomicBoolean finished;
/**
* 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down
*/
protected int reportFailedCnt = 0;
protected static final int MAX_REPORT_FAILED_THRESHOLD = 5;
protected TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
this.createTime = System.currentTimeMillis();
this.workerRuntime = workerRuntime;
this.instanceId = req.getInstanceId();
this.instanceInfo = new InstanceInfo();
// PowerJob 值拷贝场景不多,引入三方值拷贝类库可能引入类冲突等问题,综合评估手写 ROI 最高
instanceInfo.setJobId(req.getJobId());
instanceInfo.setInstanceId(req.getInstanceId());
instanceInfo.setWfInstanceId(req.getWfInstanceId());
instanceInfo.setExecuteType(req.getExecuteType());
instanceInfo.setProcessorType(req.getProcessorType());
instanceInfo.setProcessorInfo(req.getProcessorInfo());
instanceInfo.setJobParams(req.getJobParams());
instanceInfo.setInstanceParams(req.getInstanceParams());
instanceInfo.setThreadConcurrency(req.getThreadConcurrency());
instanceInfo.setTaskRetryNum(req.getTaskRetryNum());
instanceInfo.setLogConfig(req.getLogConfig());
// 特殊处理超时时间
if (instanceInfo.getInstanceTimeoutMS() <= 0) {
instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE);
}
// 只有工作流中的任务允许向工作流中追加上下文数据
this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap();
this.finished = new AtomicBoolean(false);
}
/**
* 销毁
*/
public abstract void destroy();
/**
* 停止任务
*/
public abstract void stopTask();
/**
* 查询任务实例的详细运行状态
*
* @return 任务实例的详细运行状态
*/
public abstract InstanceDetail fetchRunningStatus();
//......
}
TaskTracker是个抽象类,其构造器接收ServerScheduleJobReq、WorkerRuntime,然后根据ServerScheduleJobReq构建InstanceInfo;它定义了destroy、stopTask、fetchRunningStatus抽象方法
tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java
@Slf4j
public class LightTaskTracker extends TaskTracker {
/**
* statusReportScheduledFuture
*/
private final ScheduledFuture<?> statusReportScheduledFuture;
/**
* timeoutCheckScheduledFuture
*/
private final ScheduledFuture<?> timeoutCheckScheduledFuture;
/**
* processFuture
*/
private final Future<ProcessResult> processFuture;
/**
* 执行线程
*/
private final AtomicReference<Thread> executeThread;
/**
* 处理器信息
*/
private final ProcessorBean processorBean;
/**
* 上下文
*/
private final TaskContext taskContext;
/**
* 任务状态
*/
private TaskStatus status;
/**
* 任务开始执行的时间
*/
private Long taskStartTime;
/**
* 任务执行结束的时间 或者 任务被 kill 掉的时间
*/
private Long taskEndTime;
/**
* 任务处理结果
*/
private ProcessResult result;
private final AtomicBoolean timeoutFlag = new AtomicBoolean(false);
protected final AtomicBoolean stopFlag = new AtomicBoolean(false);
protected final AtomicBoolean destroyFlag = new AtomicBoolean(false);
public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
super(req, workerRuntime);
try {
taskContext = constructTaskContext(req, workerRuntime);
// 等待处理
status = TaskStatus.WORKER_RECEIVED;
// 加载 Processor
processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo()));
executeThread = new AtomicReference<>();
long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L;
// 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段
long initDelay = RandomUtils.nextInt(5000, 10000);
// 上报任务状态
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS);
// 超时控制
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) {
if (instanceInfo.getInstanceTimeoutMS() < 1000L) {
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS);
} else {
// 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS);
}
} else {
timeoutCheckScheduledFuture = null;
}
// 提交任务到线程池
processFuture = workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit(this::processTask);
} catch (Exception e) {
log.error("[TaskTracker-{}] fail to create TaskTracker for req:{} ", instanceId, req);
destroy();
throw e;
}
}
//......
}
LightTaskTracker继承了TaskTracker,其构造器根据ServerScheduleJobReq创建ProcessorDefinition,然后使用workerRuntime.getProcessorLoader().load方法进行加载,之后通过workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay调度checkAndReportStatus;若设置了instanceTimeout则调度timeoutCheck;最后通过workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit来执行processTask
private synchronized void checkAndReportStatus() {
if (destroyFlag.get()) {
// 已经被销毁,不需要上报状态
log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status);
return;
}
TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq();
reportInstanceStatusReq.setAppId(workerRuntime.getAppId());
reportInstanceStatusReq.setJobId(instanceInfo.getJobId());
reportInstanceStatusReq.setInstanceId(instanceId);
reportInstanceStatusReq.setWfInstanceId(instanceInfo.getWfInstanceId());
reportInstanceStatusReq.setTotalTaskNum(1);
reportInstanceStatusReq.setReportTime(System.currentTimeMillis());
reportInstanceStatusReq.setStartTime(createTime);
reportInstanceStatusReq.setSourceAddress(workerRuntime.getWorkerAddress());
reportInstanceStatusReq.setSucceedTaskNum(0);
reportInstanceStatusReq.setFailedTaskNum(0);
if (stopFlag.get()) {
if (finished.get()) {
// 已经被成功打断
destroy();
return;
}
final Thread workerThread = executeThread.get();
if (!finished.get() && workerThread != null) {
// 未能成功打断任务,强制停止
try {
if (tryForceStopThread(workerThread)) {
finished.set(true);
taskEndTime = System.currentTimeMillis();
result = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_FORCE_STOP);
log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName());
// 被终止的任务不需要上报状态
destroy();
return;
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e);
}
}
}
if (finished.get()) {
if (result.isSuccess()) {
reportInstanceStatusReq.setSucceedTaskNum(1);
reportInstanceStatusReq.setInstanceStatus(InstanceStatus.SUCCEED.getV());
} else {
reportInstanceStatusReq.setFailedTaskNum(1);
reportInstanceStatusReq.setInstanceStatus(InstanceStatus.FAILED.getV());
}
// 处理工作流上下文
if (taskContext.getWorkflowContext().getWfInstanceId() != null) {
reportInstanceStatusReq.setAppendedWfContext(taskContext.getWorkflowContext().getAppendedContextData());
}
reportInstanceStatusReq.setResult(suit(result.getMsg()));
reportInstanceStatusReq.setEndTime(taskEndTime);
// 微操一下,上报最终状态时重新设置下时间,并且增加一小段偏移,保证在并发上报运行中状态以及最终状态时,最终状态的上报时间晚于运行中的状态
reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1);
reportFinalStatusThenDestroy(workerRuntime, reportInstanceStatusReq);
return;
}
// 未完成的任务,只需要上报状态
reportInstanceStatusReq.setInstanceStatus(InstanceStatus.RUNNING.getV());
log.info("[TaskTracker-{}] report status({}) success,real status is {}", instanceId, reportInstanceStatusReq, status);
TransportUtils.ttReportInstanceStatus(reportInstanceStatusReq, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());
}
checkAndReportStatus方法构建TaskTrackerReportInstanceStatusReq,然后根据stopFlag和finished进行对应处理,针对未完成的任务执行TransportUtils.ttReportInstanceStatus进行上报
private void timeoutCheck() {
if (taskStartTime == null || System.currentTimeMillis() - taskStartTime < instanceInfo.getInstanceTimeoutMS()) {
return;
}
if (finished.get() && result != null) {
timeoutCheckScheduledFuture.cancel(true);
return;
}
// 首次判断超时
if (timeoutFlag.compareAndSet(false, true)) {
// 超时,仅尝试打断任务
log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(), instanceInfo.getInstanceTimeoutMS());
processFuture.cancel(true);
return;
}
if (finished.get()) {
// 已经成功被打断
log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},endTime:{}, interrupt success.", instanceId, taskStartTime, taskEndTime);
return;
}
Thread workerThread = executeThread.get();
if (workerThread == null) {
return;
}
// 未能成功打断任务,强制终止
try {
if (tryForceStopThread(workerThread)) {
finished.set(true);
taskEndTime = System.currentTimeMillis();
result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP);
log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName());
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] task timeout,fail to stop thread {}", instanceId, workerThread.getName(), e);
}
}
private boolean tryForceStopThread(Thread thread) {
String threadName = thread.getName();
String allowStopThread = System.getProperty(PowerJobDKey.WORKER_ALLOWED_FORCE_STOP_THREAD);
if (!StringUtils.equalsIgnoreCase(allowStopThread, Boolean.TRUE.toString())) {
log.warn("[TaskTracker-{}] PowerJob not allowed to force stop a thread by config", instanceId);
return false;
}
log.warn("[TaskTracker-{}] fail to interrupt the thread[{}], try to force stop.", instanceId, threadName);
try {
thread.stop();
return true;
} catch (Throwable t) {
log.warn("[TaskTracker-{}] stop thread[{}] failed, msg: {}", instanceId, threadName, t.getMessage());
}
return false;
}
timeoutCheck先判断是否超时,接着判断是否finished,是则取消当前任务,接着更新timeoutFlag,然后通过processFuture.cancel(true)尝试打断任务;若任务未能成功打断则通过tryForceStopThread强制终止,这里用了thread.stop这个废弃方法
private ProcessResult processTask() {
executeThread.set(Thread.currentThread());
// 设置任务开始执行的时间
taskStartTime = System.currentTimeMillis();
status = TaskStatus.WORKER_PROCESSING;
// 开始执行时,提交任务判断是否超时
ProcessResult res = null;
do {
Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
if (res != null && !res.isSuccess()) {
// 重试
taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1);
log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes());
}
try {
res = processorBean.getProcessor().process(taskContext);
} catch (InterruptedException e) {
log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);
Thread.currentThread().interrupt();
if (timeoutFlag.get()) {
res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED);
} else if (stopFlag.get()) {
res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED);
} else {
res = new ProcessResult(false, e.toString());
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] process failed !", instanceId, e);
res = new ProcessResult(false, e.toString());
}
if (res == null) {
log.warn("[TaskTracker-{}] processor return null !", instanceId);
res = new ProcessResult(false, "Processor return null");
}
} while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get());
executeThread.set(null);
taskEndTime = System.currentTimeMillis();
finished.set(true);
result = res;
status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
// 取消超时检查任务
if (timeoutCheckScheduledFuture != null) {
timeoutCheckScheduledFuture.cancel(true);
}
log.info("[TaskTracker-{}] task complete ! create time:{},queue time:{},use time:{},result:{}", instanceId, createTime, taskStartTime - createTime, System.currentTimeMillis() - taskStartTime, result);
// 执行完成后立即上报一次
checkAndReportStatus();
return result;
}
processTask通过一个while循环来执行,该循环的条件是处理结果非成功,重试次数小于最大重试次数,任务未超时,stopFlag为false;循环内部执行的是processorBean.getProcessor().process(taskContext),它会捕获InterruptedException及Exception;循环外则更新任务结束时间,取消timeoutCheckScheduledFuture,最后执行checkAndReportStatus进行上报
/**
* 静态方法创建 TaskTracker
*
* @param req 服务端调度任务请求
* @return LightTaskTracker
*/
public static LightTaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
try {
return new LightTaskTracker(req, workerRuntime);
} catch (Exception e) {
reportCreateErrorToServer(req, workerRuntime, e);
}
return null;
}
LightTaskTracker提供了静态方法create用于创建LightTaskTracker
tech/powerjob/worker/actors/TaskTrackerActor.java
@Slf4j
@Actor(path = WTT_PATH)
public class TaskTrackerActor {
private final WorkerRuntime workerRuntime;
public TaskTrackerActor(WorkerRuntime workerRuntime) {
this.workerRuntime = workerRuntime;
}
/**
* 服务器任务调度处理器
*/
@Handler(path = WTT_HANDLER_RUN_JOB)
public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
log.debug("[TaskTrackerActor] server schedule job by request: {}.", req);
Long instanceId = req.getInstanceId();
// 区分轻量级任务模型以及重量级任务模型
if (isLightweightTask(req)) {
final LightTaskTracker taskTracker = LightTaskTrackerManager.getTaskTracker(instanceId);
if (taskTracker != null) {
log.warn("[TaskTrackerActor] LightTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId);
return;
}
// 判断是否已经 overload
if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() * LightTaskTrackerManager.OVERLOAD_FACTOR) {
// ignore this request
log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={}),current size = {}!",instanceId,LightTaskTrackerManager.currentTaskTrackerSize());
return;
}
if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum()) {
log.warn("[TaskTrackerActor] this worker will be overload soon,current size = {}!",LightTaskTrackerManager.currentTaskTrackerSize());
}
// 创建轻量级任务
LightTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> LightTaskTracker.create(req, workerRuntime));
} else {
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(instanceId);
if (taskTracker != null) {
log.warn("[TaskTrackerActor] HeavyTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId);
return;
}
// 判断是否已经 overload
if (HeavyTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum()) {
// ignore this request
log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={})! current size = {},", instanceId, HeavyTaskTrackerManager.currentTaskTrackerSize());
return;
}
// 原子创建,防止多实例的存在
HeavyTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> HeavyTaskTracker.create(req, workerRuntime));
}
}
//......
}
TaskTrackerActor的path为taskTracker,它用于处理server的jobInstance请求和worker的task请求;其onReceiveServerScheduleJobReq方法的path为runJob,它接收ServerScheduleJobReq,然后通过isLightweightTask判断任务模型,是轻量级任务的话,则通过LightTaskTrackerManager.getTaskTracker(instanceId)获取taskTracker,接着判断当前实例的LightTaskTracker数量是否过多,过多则直接返回;最后通过LightTaskTrackerManager.atomicCreateTaskTracker来维护instanceId与LightTaskTracker的关系,若不存在则通过LightTaskTracker.create(req, workerRuntime)创建LightTaskTracker
private boolean isLightweightTask(ServerScheduleJobReq serverScheduleJobReq) {
final ExecuteType executeType = ExecuteType.valueOf(serverScheduleJobReq.getExecuteType());
// 非单机执行的一定不是
if (executeType != ExecuteType.STANDALONE){
return false;
}
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(serverScheduleJobReq.getTimeExpressionType());
// 固定频率以及固定延迟的也一定不是
return timeExpressionType != TimeExpressionType.FIXED_DELAY && timeExpressionType != TimeExpressionType.FIXED_RATE;
}
isLightweightTask的判断逻辑是如果executeType不是单机类型则不是轻量级任务,接着判断serverScheduleJobReq的timeExpressionType,类型不是FIXED_DELAY也不是FIXED_RATE的才是轻量级任务
LightTaskTracker继承了TaskTracker,其构造器根据ServerScheduleJobReq创建ProcessorDefinition,然后使用workerRuntime.getProcessorLoader().load方法进行加载,之后调度checkAndReportStatus、timeoutCheck;最后执行processTask(没有把任务处理放到start方法,这些都在构造器里执行了
);TaskTrackerActor的path为taskTracker,它用于处理server的jobInstance请求和worker的task请求;其onReceiveServerScheduleJobReq方法的path为runJob,它接收ServerScheduleJobReq,用于根据请求的instanceId来创建和执行LightTaskTracker(通过ConcurrentHashMap来维护instanceId与LightTaskTracker的关系,避免重复执行
)。