本文主要研究一下PowerJob的HttpProcessor
tech/powerjob/worker/core/processor/sdk/BasicProcessor.java
public interface BasicProcessor {
/**
* 核心处理逻辑
* 可通过 {@link TaskContext#getWorkflowContext()} 方法获取工作流上下文
*
* @param context 任务上下文,可通过 jobParams 和 instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数
* @return 处理结果,msg有长度限制,超长会被裁剪,不允许返回 null
* @throws Exception 异常,允许抛出异常,但不推荐,最好由业务开发者自己处理
*/
ProcessResult process(TaskContext context) throws Exception;
}
BasicProcessor是适用于单机运行的基础处理器,它定于了process方法
tech/powerjob/official/processors/CommonBasicProcessor.java
@Slf4j
public abstract class CommonBasicProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext ctx) throws Exception {
OmsLogger omsLogger = ctx.getOmsLogger();
String securityDKey = getSecurityDKey();
if (SecurityUtils.disable(securityDKey)) {
String msg = String.format("%s is not enabled, please set '-D%s=true' to enable it", this.getClass().getSimpleName(), securityDKey);
omsLogger.warn(msg);
return new ProcessResult(false, msg);
}
String status = "unknown";
Stopwatch sw = Stopwatch.createStarted();
omsLogger.info("using params: {}", CommonUtils.parseParams(ctx));
try {
ProcessResult result = process0(ctx);
omsLogger.info("execute succeed, using {}, result: {}", sw, result);
status = result.isSuccess() ? "succeed" : "failed";
return result;
} catch (Throwable t) {
status = "exception";
omsLogger.error("execute failed!", t);
return new ProcessResult(false, ExceptionUtils.getMessage(t));
} finally {
log.info("{}|{}|{}|{}|{}", getClass().getSimpleName(), ctx.getJobId(), ctx.getInstanceId(), status, sw);
}
}
protected abstract ProcessResult process0(TaskContext taskContext) throws Exception;
protected String getSecurityDKey() {
return null;
}
}
CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果
public class HttpProcessor extends CommonBasicProcessor {
/**
* Default timeout is 60 seconds.
*/
private static final int DEFAULT_TIMEOUT = 60;
private static final int HTTP_SUCCESS_CODE = 200;
private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();
@Override
public ProcessResult process0(TaskContext taskContext) throws Exception {
OmsLogger omsLogger = taskContext.getOmsLogger();
HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);
if (httpParams == null) {
String message = "httpParams is null, please check jobParam configuration.";
omsLogger.warn(message);
return new ProcessResult(false, message);
}
if (StringUtils.isEmpty(httpParams.url)) {
return new ProcessResult(false, "url can't be empty!");
}
if (!httpParams.url.startsWith("http")) {
httpParams.url = "http://" + httpParams.url;
}
omsLogger.info("request url: {}", httpParams.url);
// set default method
if (StringUtils.isEmpty(httpParams.method)) {
httpParams.method = "GET";
omsLogger.info("using default request method: GET");
} else {
httpParams.method = httpParams.method.toUpperCase();
omsLogger.info("request method: {}", httpParams.method);
}
// set default mediaType
if (!"GET".equals(httpParams.method)) {
// set default request body
if (StringUtils.isEmpty(httpParams.body)) {
httpParams.body = new JSONObject().toJSONString();
omsLogger.warn("try to use default request body:{}", httpParams.body);
}
if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) {
httpParams.mediaType = "application/json";
omsLogger.warn("try to use 'application/json' as media type");
}
}
// set default timeout
if (httpParams.timeout == null) {
httpParams.timeout = DEFAULT_TIMEOUT;
}
omsLogger.info("request timeout: {} seconds", httpParams.timeout);
OkHttpClient client = getClient(httpParams.timeout);
Request.Builder builder = new Request.Builder().url(httpParams.url);
if (httpParams.headers != null) {
httpParams.headers.forEach((k, v) -> {
builder.addHeader(k, v);
omsLogger.info("add header {}:{}", k, v);
});
}
switch (httpParams.method) {
case "PUT":
case "DELETE":
case "POST":
MediaType mediaType = MediaType.parse(httpParams.mediaType);
omsLogger.info("mediaType: {}", mediaType);
RequestBody requestBody = RequestBody.create(mediaType, httpParams.body);
builder.method(httpParams.method, requestBody);
break;
default:
builder.get();
}
Response response = client.newCall(builder.build()).execute();
omsLogger.info("response: {}", response);
String msgBody = "";
if (response.body() != null) {
msgBody = response.body().string();
}
int responseCode = response.code();
String res = String.format("code:%d, body:%s", responseCode, msgBody);
boolean success = true;
if (responseCode != HTTP_SUCCESS_CODE) {
success = false;
omsLogger.warn("{} url: {} failed, response code is {}, response body is {}",
httpParams.method, httpParams.url, responseCode, msgBody);
}
return new ProcessResult(success, res);
}
//......
}
HttpProcessor继承了CommonBasicProcessor,其process0方法使用CommonUtils.parseParams(taskContext)获取参数,解析为HttpParams类型,然后构建Request.Builder,针对PUT、DELETE、POST构建RequestBody,然后通过client.newCall(builder.build()).execute()执行请求获取返回结果,根据http status来判断是否success
tech/powerjob/official/processors/util/CommonUtils.java
public class CommonUtils {
private CommonUtils() {
}
public static String parseParams(TaskContext context) {
// 工作流中的总是优先使用 jobParams
if (context.getWorkflowContext().getWfInstanceId() != null) {
return context.getJobParams();
}
if (StringUtils.isNotEmpty(context.getInstanceParams())) {
return context.getInstanceParams();
}
return context.getJobParams();
}
}
CommonUtils的parseParams方法针对使用工作流返回jobParams,否则优先返回instanceParams
@Data
public static class HttpParams {
/**
* POST / GET / PUT / DELETE
*/
private String method;
/**
* the request url
*/
private String url;
/**
* application/json
* application/xml
* image/png
* image/jpeg
* image/gif
*/
private String mediaType;
private String body;
private Map<String, String> headers;
/**
* timeout for complete calls
*/
private Integer timeout;
}
HttpParams定义了method、url、mediaType、body、headers、timeout属性
private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();
private static OkHttpClient getClient(Integer timeout) {
return CLIENT_STORE.computeIfAbsent(timeout, ignore -> new OkHttpClient.Builder()
.connectTimeout(Duration.ZERO)
.readTimeout(Duration.ZERO)
.writeTimeout(Duration.ZERO)
.callTimeout(timeout, TimeUnit.SECONDS)
.build());
}
getClient将相同timeout的OkHttpClient进行了缓存
PowerJob的HttpProcessor继承了CommonBasicProcessor,它接收HttpParams参数,然后使用okhttp进行请求;CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果。