本文主要研究一下PowerJob的RemoteEngine
tech/powerjob/remote/framework/engine/RemoteEngine.java
public interface RemoteEngine {
EngineOutput start(EngineConfig engineConfig);
void close() throws IOException;
}
RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput
tech/powerjob/remote/framework/engine/EngineConfig.java
@Data
@Accessors(chain = true)
public class EngineConfig implements Serializable {
/**
* 服务类型
*/
private ServerType serverType;
/**
* 需要启动的引擎类型
*/
private String type;
/**
* 绑定的本地地址
*/
private Address bindAddress;
/**
* actor实例,交由使用侧自己实例化以便自行注入各种 bean
*/
private List<Object> actorList;
}
EngineConfig定义了serverType(
SERVER、WORKER
),type、bindAddress、actorList属性
tech/powerjob/remote/framework/engine/EngineOutput.java
@Getter
@Setter
public class EngineOutput {
private Transporter transporter;
}
EngineOutput定义了transporter
tech/powerjob/remote/framework/transporter/Transporter.java
public interface Transporter {
/**
* Protocol
* @return return protocol
*/
Protocol getProtocol();
/**
*send message
* @param url url
* @param request request
*/
void tell(URL url, PowerSerializable request);
/**
* ask by request
* @param url url
* @param request request
* @param clz response type
* @return CompletionStage
* @throws RemotingException remote exception
*/
<T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}
Transporter接口定义了getProtocol(
AkkaProtocol、HttpProtocol
)、tell、ask三个方法
tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java
@Slf4j
public class PowerJobRemoteEngine implements RemoteEngine {
private CSInitializer csInitializer;
@Override
public EngineOutput start(EngineConfig engineConfig) {
final String engineType = engineConfig.getType();
EngineOutput engineOutput = new EngineOutput();
log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);
List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
csInitializer = CSInitializerFactory.build(engineType);
String type = csInitializer.type();
Stopwatch sw = Stopwatch.createStarted();
log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);
csInitializer.init(new CSInitializerConfig()
.setBindAddress(engineConfig.getBindAddress())
.setServerType(engineConfig.getServerType())
);
// 构建通讯器
Transporter transporter = csInitializer.buildTransporter();
engineOutput.setTransporter(transporter);
log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));
// 绑定 handler
csInitializer.bindHandlers(actorInfos);
log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);
return engineOutput;
}
@Override
public void close() throws IOException {
csInitializer.close();
}
}
PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer
tech/powerjob/remote/framework/engine/impl/ActorFactory.java
@Slf4j
class ActorFactory {
static List<ActorInfo> load(List<Object> actorList) {
List<ActorInfo> actorInfos = Lists.newArrayList();
actorList.forEach(actor -> {
final Class<?> clz = actor.getClass();
try {
final Actor anno = clz.getAnnotation(Actor.class);
ActorInfo actorInfo = new ActorInfo().setActor(actor).setAnno(anno);
actorInfo.setHandlerInfos(loadHandlerInfos4Actor(actorInfo));
actorInfos.add(actorInfo);
} catch (Throwable t) {
log.error("[ActorFactory] process Actor[{}] failed!", clz);
ExceptionUtils.rethrow(t);
}
});
return actorInfos;
}
//......
}
ActorFactory.load方法遍历actorList,获取其类上的
@Actor
注解,再收集其方法上的@Handler
注解信息设置到actorInfo
tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java
@Slf4j
class CSInitializerFactory {
static CSInitializer build(String targetType) {
Reflections reflections = new Reflections(OmsConstant.PACKAGE);
Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);
log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);
for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {
try {
CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();
String type = csInitializer.type();
log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);
if (targetType.equalsIgnoreCase(type)) {
return csInitializer;
}
} catch (Exception e) {
log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);
ExceptionUtils.rethrow(e);
}
}
throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob' and import the dependencies!", targetType));
}
}
CSInitializerFactory的build方法通过org.reflections.Reflections去扫描
tech.powerjob
包,获取CSInitializer的子类,之后通过反射进行实例化
tech/powerjob/remote/framework/cs/CSInitializer.java
public interface CSInitializer {
/**
* 类型名称,比如 akka, netty4,httpJson
* @return 名称
*/
String type();
/**
* initialize the framework
* @param config config
*/
void init(CSInitializerConfig config);
/**
* build a Transporter by based network framework
* @return Transporter
*/
Transporter buildTransporter();
/**
* bind Actor, publish handler's service
* @param actorInfos actor infos
*/
void bindHandlers(List<ActorInfo> actorInfos);
void close() throws IOException;
}
CSInitializer接口定义了type、init、buildTransporter、close方法,它有两个实现类,分别是AkkaCSInitializer、HttpVertxCSInitializer
tech/powerjob/remote/framework/cs/CSInitializerConfig.java
@Getter
@Setter
@Accessors(chain = true)
public class CSInitializerConfig implements Serializable {
private Address bindAddress;
private ServerType serverType;
}
CSInitializerConfig定义了bindAddress、serverType两个属性
tech/powerjob/remote/akka/AkkaCSInitializer.java
@Slf4j
public class AkkaCSInitializer implements CSInitializer {
private ActorSystem actorSystem;
private CSInitializerConfig config;
@Override
public String type() {
return tech.powerjob.common.enums.Protocol.AKKA.name();
}
@Override
public void init(CSInitializerConfig config) {
this.config = config;
Address bindAddress = config.getBindAddress();
log.info("[PowerJob-AKKA] bindAddress: {}", bindAddress);
// 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了)
Map<String, Object> overrideConfig = Maps.newHashMap();
overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
log.info("[PowerJob-AKKA] try to start AKKA System.");
// 启动时绑定当前的 actorSystemName
String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());
this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);
// 处理系统中产生的异常情况
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", actorSystem.name());
}
@Override
public Transporter buildTransporter() {
return new AkkaTransporter(actorSystem);
}
@Override
public void bindHandlers(List<ActorInfo> actorInfos) {
int cores = Runtime.getRuntime().availableProcessors();
actorInfos.forEach(actorInfo -> {
String rootPath = actorInfo.getAnno().path();
AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);
log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig));
actorSystem.actorOf(AkkaProxyActor.props(actorInfo)
.withDispatcher("akka.".concat(actorConfig.getDispatcherName()))
.withRouter(new RoundRobinPool(cores)), actorConfig.getActorName());
});
}
@Override
public void close() throws IOException {
actorSystem.terminate();
}
}
AkkaCSInitializer其type方法返回的是AKKA类型,init方法先通过ConfigFactory.load(AkkaConstant.AKKA_CONFIG)加载akka基本配置,再覆盖hostname和port信息,最后通过ActorSystem.create(actorSystemName, akkaFinalConfig)创建actorSystem,并创建AkkaTroubleshootingActor,订阅DeadLetter消息;buildTransporter返回的是AkkaTransporter;其bindHandlers方法主要是根据ActorInfo信息来创建actor;其close方法执行actorSystem.terminate()
tech/powerjob/remote/akka/AkkaTransporter.java
public class AkkaTransporter implements Transporter {
private final ActorSystem actorSystem;
/**
* akka://<actor system>@<hostname>:<port>/<actor path>
*/
private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
public AkkaTransporter(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
@Override
public Protocol getProtocol() {
return new AkkaProtocol();
}
@Override
public void tell(URL url, PowerSerializable request) {
ActorSelection actorSelection = fetchActorSelection(url);
actorSelection.tell(request, null);
}
@Override
@SuppressWarnings("unchecked")
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
ActorSelection actorSelection = fetchActorSelection(url);
return (CompletionStage<T>) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
}
private ActorSelection fetchActorSelection(URL url) {
HandlerLocation location = url.getLocation();
String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType());
String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();
CommonUtils.requireNonNull(targetActorName, "can't find actor by URL: " + location);
String address = url.getAddress().toFullAddress();
return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, targetActorName));
}
}
AkkaTransporter其protocol为AkkaProtocol;其tell方法根据url找到actorSelection,通过actorSelection的tell发送请求;ask方法使用的是Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS))
tech/powerjob/remote/http/HttpVertxCSInitializer.java
@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {
private Vertx vertx;
private HttpServer httpServer;
private HttpClient httpClient;
private CSInitializerConfig config;
@Override
public String type() {
return tech.powerjob.common.enums.Protocol.HTTP.name();
}
@Override
public void init(CSInitializerConfig config) {
this.config = config;
vertx = VertxInitializer.buildVertx();
httpServer = VertxInitializer.buildHttpServer(vertx);
httpClient = VertxInitializer.buildHttpClient(vertx);
}
@Override
public Transporter buildTransporter() {
return new VertxTransporter(httpClient);
}
@Override
@SneakyThrows
public void bindHandlers(List<ActorInfo> actorInfos) {
Router router = Router.router(vertx);
// 处理请求响应
router.route().handler(BodyHandler.create());
actorInfos.forEach(actorInfo -> {
Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
String handlerHttpPath = handlerInfo.getLocation().toPath();
ProcessType processType = handlerInfo.getAnno().processType();
Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
Route route = router.post(handlerHttpPath);
if (processType == ProcessType.BLOCKING) {
route.blockingHandler(routingContextHandler, false);
} else {
route.handler(routingContextHandler);
}
});
});
// 启动 vertx http server
final int port = config.getBindAddress().getPort();
final String host = config.getBindAddress().getHost();
httpServer.requestHandler(router)
.exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
.listen(port, host)
.toCompletionStage()
.toCompletableFuture()
.get(1, TimeUnit.MINUTES);
log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");
}
private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {
Method method = handlerInfo.getMethod();
Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());
// 内部框架,严格模式,绑定失败直接报错
if (!powerSerializeClz.isPresent()) {
throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());
}
return ctx -> {
final RequestBody body = ctx.body();
final Object convertResult = body.asPojo(powerSerializeClz.get());
try {
Object response = method.invoke(actorInfo.getActor(), convertResult);
if (response != null) {
if (response instanceof String) {
ctx.end((String) response);
} else {
ctx.json(JsonObject.mapFrom(response));
}
return;
}
ctx.end();
} catch (Throwable t) {
// 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式
log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);
ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);
}
};
}
@Override
public void close() throws IOException {
httpClient.close();
httpServer.close();
vertx.close();
}
}
HttpVertxCSInitializer的type类型为HTTP,其init方法主要是通过VertxInitializer.buildVertx()构建vertx,并通过VertxInitializer.buildHttpServer(vertx)构建httpServer,通过VertxInitializer.buildHttpClient(vertx)构建httpClient;其buildTransporter返回的是VertxTransporter;其bindHandlers主要是通过actorInfo去注册vertx的路由及handler;其close方法依次关闭httpClient、httpServer、vertx
tech/powerjob/remote/http/vertx/VertxTransporter.java
public class VertxTransporter implements Transporter {
private final HttpClient httpClient;
private static final Protocol PROTOCOL = new HttpProtocol();
public VertxTransporter(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public Protocol getProtocol() {
return PROTOCOL;
}
@Override
public void tell(URL url, PowerSerializable request) {
post(url, request, null);
}
@Override
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
return post(url, request, clz);
}
@SuppressWarnings("unchecked")
private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
final String host = url.getAddress().getHost();
final int port = url.getAddress().getPort();
final String path = url.getLocation().toPath();
RequestOptions requestOptions = new RequestOptions()
.setMethod(HttpMethod.POST)
.setHost(host)
.setPort(port)
.setURI(path);
// 获取远程服务器的HTTP连接
Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
// 转换 -> 发送请求获取响应
Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->
httpClientRequest
.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.send(JsonObject.mapFrom(request).toBuffer())
);
return responseFuture.compose(httpClientResponse -> {
// throw exception
final int statusCode = httpClientResponse.statusCode();
if (statusCode != HttpResponseStatus.OK.code()) {
// CompletableFuture.get() 时会传递抛出该异常
throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
host, port, path, statusCode, httpClientResponse.statusMessage()
));
}
return httpClientResponse.body().compose(x -> {
if (clz == null) {
return Future.succeededFuture(null);
}
if (clz.equals(String.class)) {
return Future.succeededFuture((T) x.toString());
}
return Future.succeededFuture(x.toJsonObject().mapTo(clz));
});
}).toCompletionStage();
}
}
VertxTransporter的protocol为HttpProtocol,其tell方法使用的是不需要返回值的post,其ask方法也是调用post方法,只不过其设定了返回值类型
PowerJob的RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput;PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer。