本文主要研究一下PowerJob的DesignateServer
tech/powerjob/server/remote/server/redirector/DesignateServer.java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DesignateServer {
/**
* 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称,默认为 appId
* @return appId 参数名称
*/
String appIdParameterName() default "appId";
}
DesignateServer注解定义了appIdParameterName属性,默认是appId
tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java
@Slf4j
@Aspect
@Component
@Order(0)
@RequiredArgsConstructor
public class DesignateServerAspect {
private final TransportService transportService;
private final AppInfoRepository appInfoRepository;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Around(value = "@annotation(designateServer))")
public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable {
// 参数
Object[] args = point.getArgs();
// 方法名
String methodName = point.getSignature().getName();
// 类名
String className = point.getSignature().getDeclaringTypeName();
Signature signature = point.getSignature();
// 方法签名
MethodSignature methodSignature = (MethodSignature) signature;
String[] parameterNames = methodSignature.getParameterNames();
String[] parameterTypes = Arrays.stream(methodSignature.getParameterTypes()).map(Class::getName).toArray(String[]::new);
Long appId = null;
for (int i = 0; i < parameterNames.length; i++) {
if (StringUtils.equals(parameterNames[i], designateServer.appIdParameterName())) {
appId = Long.parseLong(String.valueOf(args[i]));
break;
}
}
if (appId == null) {
throw new PowerJobException("can't find appId in params for:" + signature);
}
// 获取执行机器
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new PowerJobException("can't find app info"));
String targetServer = appInfo.getCurrentServer();
// 目标IP为空,本地执行
if (StringUtils.isEmpty(targetServer)) {
return point.proceed();
}
// 目标IP与本地符合则本地执行
if (Objects.equals(targetServer, transportService.defaultProtocol().getAddress())) {
return point.proceed();
}
log.info("[DesignateServerAspect] the method[{}] should execute in server[{}], so this request will be redirect to remote server!", signature.toShortString(), targetServer);
// 转发请求,远程执行后返回结果
RemoteProcessReq remoteProcessReq = new RemoteProcessReq()
.setClassName(className)
.setMethodName(methodName)
.setParameterTypes(parameterTypes)
.setArgs(args);
final URL friendUrl = ServerURLFactory.process2Friend(targetServer);
CompletionStage<AskResponse> askCS = transportService.ask(Protocol.HTTP.name(), friendUrl, remoteProcessReq, AskResponse.class);
AskResponse askResponse = askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!askResponse.isSuccess()) {
throw new PowerJobException("remote process failed: " + askResponse.getMessage());
}
// 考虑范型情况
Method method = methodSignature.getMethod();
JavaType returnType = getMethodReturnJavaType(method);
return OBJECT_MAPPER.readValue(askResponse.getData(), returnType);
}
//......
}
DesignateServerAspect拦截了@DesignateServer注解,它先解析方法参数名,取出参数名与@DesignateServer的appIdParameterName一致的参数值,再通过appInfoRepository.findById找到AppInfoDO,获取appInfo.getCurrentServer();若currentServer就是本机则执行point.proceed(),否则构建RemoteProcessReq,通过transportService.ask转发请求
tech/powerjob/server/core/instance/InstanceLogService.java
/**
* 获取日志的下载链接
* @param appId AOP 专用
* @param instanceId 任务实例 ID
* @return 下载链接
*/
@DesignateServer
public String fetchDownloadUrl(Long appId, Long instanceId) {
String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
return url;
}
fetchDownloadUrl指定了@DesignateServer注解,会根据appId的值限定在指定server执行
PowerJob的DesignateServer注解定义了appIdParameterName属性,默认是appId;DesignateServerAspect拦截了@DesignateServer注解,它判断currentServer就是本机则执行point.proceed(),否则构建RemoteProcessReq,通过transportService.ask转发请求到指定server执行。