本文主要研究一下PowerJob的TransportServiceAware
tech/powerjob/server/remote/aware/TransportServiceAware.java
public interface TransportServiceAware extends PowerJobAware {
void setTransportService(TransportService transportService);
}
TransportServiceAware继承了PowerJobAware,它定义了setTransportService方法
tech/powerjob/server/remote/server/FriendActor.java
@Slf4j
@Component
@Actor(path = S4S_PATH)
public class FriendActor implements TransportServiceAware {
private TransportService transportService;
/**
* 处理存活检测的请求
*/
@Handler(path = S4S_HANDLER_PING, processType = ProcessType.NO_BLOCKING)
public AskResponse onReceivePing(Ping ping) {
return AskResponse.succeed(transportService.allProtocols());
}
@Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING)
public AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) {
AskResponse response = new AskResponse();
response.setSuccess(true);
try {
response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req)));
} catch (Throwable t) {
log.error("[FriendActor] process remote request[{}] failed!", req, t);
response.setSuccess(false);
response.setMessage(ExceptionUtils.getMessage(t));
}
return response;
}
@Override
public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}
}
FriendActor用于处理服务器之间的通讯,它定义了onReceivePing、onReceiveRemoteProcessReq这两个handler,其中onReceivePing返回transportService.allProtocols(),onReceiveRemoteProcessReq则执行RemoteRequestProcessor.processRemoteRequest(req)
tech/powerjob/server/remote/server/redirector/RemoteProcessReq.java
@Getter
@Setter
@Accessors(chain = true)
public class RemoteProcessReq implements PowerSerializable {
private String className;
private String methodName;
private String[] parameterTypes;
private Object[] args;
}
RemoteProcessReq定义了className、methodName、parameterTypes、args属性
tech/powerjob/server/remote/server/redirector/RemoteRequestProcessor.java
public class RemoteRequestProcessor {
public static Object processRemoteRequest(RemoteProcessReq req) throws ClassNotFoundException {
Object[] args = req.getArgs();
String[] parameterTypes = req.getParameterTypes();
Class<?>[] parameters = new Class[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
parameters[i] = Class.forName(parameterTypes[i]);
Object arg = args[i];
if (arg != null) {
args[i] = JSONObject.parseObject(JSONObject.toJSONBytes(arg), parameters[i]);
}
}
Class<?> clz = Class.forName(req.getClassName());
Object bean = SpringUtils.getBean(clz);
Method method = ReflectionUtils.findMethod(clz, req.getMethodName(), parameters);
assert method != null;
return ReflectionUtils.invokeMethod(method, bean, args);
}
}
RemoteRequestProcessor的processRemoteRequest主要是通过Class.forName加载对应的类,然后从spring中获取对应的bean,再通过ReflectionUtils查找方法,最后执行invoke
TransportServiceAware继承了PowerJobAware,它定义了setTransportService方法;FriendActor用于处理服务器之间的通讯,它定义了onReceivePing、onReceiveRemoteProcessReq这两个handler,其中onReceivePing返回transportService.allProtocols(),onReceiveRemoteProcessReq则执行RemoteRequestProcessor.processRemoteRequest(req)。