本文主要研究一下PowerJob的FileCleanupProcessor
tech/powerjob/worker/core/processor/sdk/BroadcastProcessor.java
public interface BroadcastProcessor extends BasicProcessor {
/**
* 在所有节点广播执行前执行,只会在一台机器执行一次
*/
default ProcessResult preProcess(TaskContext context) throws Exception {
return new ProcessResult(true);
}
/**
* 在所有节点广播执行完成后执行,只会在一台机器执行一次
*/
default ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
return defaultResult(taskResults);
}
static ProcessResult defaultResult(List<TaskResult> taskResults) {
long succeed = 0, failed = 0;
for (TaskResult ts : taskResults) {
if (ts.isSuccess()) {
succeed ++ ;
}else {
failed ++;
}
}
return new ProcessResult(failed == 0, String.format("succeed:%d, failed:%d", succeed, failed));
}
}
BroadcastProcessor继承了BasicProcessor,它提供了preProcess、postProcess方法
tech/powerjob/official/processors/impl/FileCleanupProcessor.java
public class FileCleanupProcessor implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
if (SecurityUtils.disable(SecurityUtils.ENABLE_FILE_CLEANUP_PROCESSOR)) {
String msg = String.format("FileCleanupProcessor is not enabled, please set '-D%s=true' to enable it", SecurityUtils.ENABLE_FILE_CLEANUP_PROCESSOR);
context.getOmsLogger().warn(msg);
return new ProcessResult(false, msg);
}
return new ProcessResult(true);
}
@Override
public ProcessResult process(TaskContext taskContext) throws Exception {
OmsLogger logger = taskContext.getOmsLogger();
logger.info("using params: {}", taskContext.getJobParams());
LongAdder cleanNum = new LongAdder();
Stopwatch sw = Stopwatch.createStarted();
List<CleanupParams> cleanupParamsList = JSONArray.parseArray(taskContext.getJobParams(), CleanupParams.class);
cleanupParamsList.forEach(params -> {
logger.info("start to process: {}", JSON.toJSON(params));
if (StringUtils.isEmpty(params.filePattern) || StringUtils.isEmpty(params.dirPath)) {
logger.warn("skip due to invalid params!");
return;
}
File dir = new File(params.dirPath);
if (!dir.exists()) {
logger.warn("skip due to dirPath[{}] not exists", params.dirPath);
return;
}
if (!dir.isDirectory()) {
logger.warn("skip due to dirPath[{}] is not a directory", params.dirPath);
return;
}
logger.info("start to search directory: {}", params.dirPath);
Collection<File> files = FileUtils.listFiles(dir, null, true);
logger.info("total file num: {}", files.size());
Pattern filePattern = Pattern.compile(params.filePattern);
files.forEach(file -> {
String fileName = file.getName();
String filePath = file.getAbsolutePath();
if (!filePattern.matcher(fileName).matches()) {
logger.info("file[{}] won't be deleted due to filename not match the pattern: {}", fileName, params.filePattern);
return;
}
// last modify time interval, xxx hours
int interval = (int) Math.ceil((System.currentTimeMillis() - file.lastModified()) / 3600000.0);
if (interval < params.retentionTime) {
logger.info("file[{}] won't be deleted because it does not meet the time requirement", filePath);
return;
}
try {
FileUtils.forceDelete(file);
cleanNum.increment();
logger.info("delete file[{}] successfully!", filePath);
} catch (Exception e) {
logger.error("delete file[{}] failed!", filePath, e);
}
});
});
return new ProcessResult(true, String.format("cost:%s,clean:%d", sw.toString(), cleanNum.longValue()));
}
@Data
public static class CleanupParams {
private String dirPath;
private String filePattern;
private Integer retentionTime;
}
}
FileCleanupProcessor实现了BroadcastProcessor接口,其preProcess判断是否开启
powerjob.official-processor.file-cleanup.enable
,其process方法接收List<CleanupParams>
参数,然后遍历指定目录,找到匹配的文件,判断当前时间距离最后修改时间是否大于等于retentionTime,是则执行FileUtils.forceDelete(file)
tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java
private void handleBroadcastRootTask(Long instanceId, TaskContext taskContext) {
BasicProcessor processor = processorBean.getProcessor();
ProcessResult processResult;
// 广播执行的第一个 task 只执行 preProcess 部分
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
try {
processResult = broadcastProcessor.preProcess(taskContext);
} catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
processResult = new ProcessResult(false, e.toString());
}
} else {
processResult = new ProcessResult(true, "NO_PREPOST_TASK");
}
// 通知 TaskTracker 创建广播子任务
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getWorkflowContext().getAppendedContextData());
}
HeavyProcessorRunnable的handleBroadcastRootTask方法针对BroadcastProcessor先执行preProcess方法
private void handleLastTask(String taskId, Long instanceId, TaskContext taskContext, ExecuteType executeType) {
final BasicProcessor processor = processorBean.getProcessor();
ProcessResult processResult;
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
List<TaskResult> taskResults = workerRuntime.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId());
try {
switch (executeType) {
case BROADCAST:
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
processResult = broadcastProcessor.postProcess(taskContext, taskResults);
} else {
processResult = BroadcastProcessor.defaultResult(taskResults);
}
break;
case MAP_REDUCE:
if (processor instanceof MapReduceProcessor) {
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
processResult = mapReduceProcessor.reduce(taskContext, taskResults);
} else {
processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
}
break;
default:
processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
} catch (Throwable e) {
processResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
}
TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(processResult.getMsg()), null, taskContext.getWorkflowContext().getAppendedContextData());
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
}
HeavyProcessorRunnable的handleLastTask方法针对BroadcastProcessor类型的会执行其postProcess方法
PowerJob的FileCleanupProcessor实现了BroadcastProcessor接口,其preProcess判断是否开启powerjob.official-processor.file-cleanup.enable
,其process方法接收List<CleanupParams>
参数,然后遍历指定目录,找到匹配的文件,判断当前时间距离最后修改时间是否大于等于retentionTime,是则执行FileUtils.forceDelete(file)。