本文主要研究一下PowerJobAutoConfiguration
tech/powerjob/worker/autoconfigure/PowerJobProperties.java
@ConfigurationProperties(prefix = "powerjob")
public class PowerJobProperties {
private final Worker worker = new Worker();
public Worker getWorker() {
return worker;
}
//......
}
PowerJobProperties的配置前缀为powerjob,主要的配置都在worker上
/**
* Powerjob worker configuration properties.
*/
@Setter
@Getter
public static class Worker {
/**
* Whether to enable PowerJob Worker
*/
private boolean enabled = true;
/**
* Name of application, String type. Total length of this property should be no more than 255
* characters. This is one of the required properties when registering a new application. This
* property should be assigned with the same value as what you entered for the appName.
*/
private String appName;
/**
* Akka port of Powerjob-worker, optional value. Default value of this property is 27777.
* If multiple PowerJob-worker nodes were deployed, different, unique ports should be assigned.
* Deprecated, please use 'port'
*/
@Deprecated
private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
/**
* port
*/
private Integer port;
/**
* Address(es) of Powerjob-server node(s). Ip:port or domain.
* Example of single Powerjob-server node:
* <p>
* 127.0.0.1:7700
* </p>
* Example of Powerjob-server cluster:
* <p>
* 192.168.0.10:7700,192.168.0.11:7700,192.168.0.12:7700
* </p>
*/
private String serverAddress;
/**
* Protocol for communication between WORKER and server
*/
private Protocol protocol = Protocol.AKKA;
/**
* Local store strategy for H2 database. {@code disk} or {@code memory}.
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
/**
* Max length of response result. Result that is longer than the value will be truncated.
* {@link ProcessResult} max length for #msg
*/
private int maxResultLength = 8192;
/**
* If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
* Test mode is used for conditions that your have no powerjob-server in your develop env, so you can't start up the application
*/
private boolean enableTestMode = false;
/**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignored.
* {@link WorkflowContext} max length for #appendedContextData
*/
private int maxAppendedWfContextLength = 8192;
private String tag;
/**
* Max numbers of LightTaskTacker
*/
private Integer maxLightweightTaskNum = 1024;
/**
* Max numbers of HeavyTaskTacker
*/
private Integer maxHeavyweightTaskNum = 64;
/**
* Interval(s) of worker health report
*/
private Integer healthReportInterval = 10;
}
Worker定义了enabled、appName、port(
默认27777
)、serverAddress(支持多个ip:port用逗号分隔
)、protocol(默认为akka,也支持http
)、storeStrategy(默认为disk,也支持memory,主要是配置H2数据库的存储模式
)、maxResultLength(返回结果的最大长度,默认为8192
)、enableTestMode(默认为false,主要用于没有部署server的场景下进行调试
)、maxAppendedWfContextLength(默认为8192
)、tag、maxLightweightTaskNum(默认为1024
)、maxHeavyweightTaskNum(默认为64
)、healthReportInterval(默认为10s
)
tech/powerjob/worker/common/PowerJobWorkerConfig.java
@Getter
@Setter
public class PowerJobWorkerConfig {
/**
* AppName, recommend to use the name of this project
* Applications should be registered by powerjob-console in advance to prevent error.
*/
private String appName;
/**
* Worker port
* Random port is enabled when port is set with non-positive number.
*/
private int port = RemoteConstant.DEFAULT_WORKER_PORT;
/**
* Address of powerjob-server node(s)
* Do not mistake for ActorSystem port. Do not add any prefix, i.e. http://.
*/
private List<String> serverAddress = Lists.newArrayList();
/**
* Protocol for communication between WORKER and server
*/
private Protocol protocol = Protocol.AKKA;
/**
* Max length of response result. Result that is longer than the value will be truncated.
* {@link ProcessResult} max length for #msg
*/
private int maxResultLength = 8096;
/**
* User-defined context object, which is passed through to the TaskContext#userContext property
* Usage Scenarios: The container Java processor needs to use the Spring bean of the host application, where you can pass in the ApplicationContext and get the bean in the Processor
*/
private Object userContext;
/**
* Internal persistence method, DISK or MEMORY
* Normally you don't need to care about this configuration
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
/**
* If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
* Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
*/
private boolean enableTestMode = false;
/**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
* {@link WorkflowContext} max length for #appendedContextData
*/
private int maxAppendedWfContextLength = 8192;
/**
* user-customized system metrics collector
*/
private SystemMetricsCollector systemMetricsCollector;
/**
* Processor factory for custom logic, generally used for IOC framework processor bean injection that is not officially supported by PowerJob
*/
private List<ProcessorFactory> processorFactoryList;
private String tag;
/**
* Max numbers of LightTaskTacker
*/
private Integer maxLightweightTaskNum = 1024;
/**
* Max numbers of HeavyTaskTacker
*/
private Integer maxHeavyweightTaskNum = 64;
/**
* Interval(s) of worker health report
*/
private Integer healthReportInterval = 10;
}
PowerJobWorkerConfig配置基本与PowerJobProperties.Worker配置相同
tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java
@Configuration
@EnableConfigurationProperties(PowerJobProperties.class)
@ConditionalOnProperty(prefix = "powerjob.worker", name = "enabled", havingValue = "true", matchIfMissing = true)
public class PowerJobAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public PowerJobSpringWorker initPowerJob(PowerJobProperties properties) {
PowerJobProperties.Worker worker = properties.getWorker();
/*
* Address of PowerJob-server node(s). Do not mistake for ActorSystem port. Do not add
* any prefix, i.e. http://.
*/
CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty! " +
"if you don't want to enable powerjob, please config program arguments: powerjob.worker.enabled=false");
List<String> serverAddress = Arrays.asList(worker.getServerAddress().split(","));
/*
* Create OhMyConfig object for setting properties.
*/
PowerJobWorkerConfig config = new PowerJobWorkerConfig();
/*
* Configuration of worker port. Random port is enabled when port is set with non-positive number.
*/
if (worker.getPort() != null) {
config.setPort(worker.getPort());
} else {
int port = worker.getAkkaPort();
if (port <= 0) {
port = NetUtils.getRandomPort();
}
config.setPort(port);
}
/*
* appName, name of the application. Applications should be registered in advance to prevent
* error. This property should be the same with what you entered for appName when getting
* registered.
*/
config.setAppName(worker.getAppName());
config.setServerAddress(serverAddress);
config.setProtocol(worker.getProtocol());
/*
* For non-Map/MapReduce tasks, {@code memory} is recommended for speeding up calculation.
* Map/MapReduce tasks may produce batches of subtasks, which could lead to OutOfMemory
* exception or error, {@code disk} should be applied.
*/
config.setStoreStrategy(worker.getStoreStrategy());
/*
* When enabledTestMode is set as true, PowerJob-worker no longer connects to PowerJob-server
* or validate appName.
*/
config.setEnableTestMode(worker.isEnableTestMode());
/*
* Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignored.
*/
config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength());
config.setTag(worker.getTag());
config.setMaxHeavyweightTaskNum(worker.getMaxHeavyweightTaskNum());
config.setMaxLightweightTaskNum(worker.getMaxLightweightTaskNum());
config.setHealthReportInterval(worker.getHealthReportInterval());
/*
* Create PowerJobSpringWorker object and set properties.
*/
return new PowerJobSpringWorker(config);
}
}
PowerJobAutoConfiguration开启了PowerJobProperties,并且会自动配置,除非
powerjob.worker.enabled
设置为false,之后它配置了PowerJobSpringWorker,这里用initPowerJob这个命名不太好,因为这样子会变成bean的名称是initPowerJob;initPowerJob方法主要是将PowerJobProperties.Worker配置转换为PowerJobWorkerConfig,在port小于等于0时支持随机port;最后根据PowerJobWorkerConfig创建PowerJobSpringWorker
tech/powerjob/worker/PowerJobSpringWorker.java
public class PowerJobSpringWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
/**
* 组合优于继承,持有 PowerJobWorker,内部重新设置 ProcessorFactory 更优雅
*/
private PowerJobWorker powerJobWorker;
private final PowerJobWorkerConfig config;
public PowerJobSpringWorker(PowerJobWorkerConfig config) {
this.config = config;
}
@Override
public void afterPropertiesSet() throws Exception {
powerJobWorker = new PowerJobWorker(config);
powerJobWorker.init();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext);
BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext);
// append BuiltInSpringProcessorFactory
List<ProcessorFactory> processorFactories = Lists.newArrayList(
Optional.ofNullable(config.getProcessorFactoryList())
.orElse(Collections.emptyList()));
processorFactories.add(springProcessorFactory);
processorFactories.add(springMethodProcessorFactory);
config.setProcessorFactoryList(processorFactories);
}
@Override
public void destroy() throws Exception {
powerJobWorker.destroy();
}
}
PowerJobSpringWorker实现了ApplicationContextAware、InitializingBean、DisposableBean接口;其afterPropertiesSet方法创建PowerJobWorker并执行init方法,其destroy方法执行powerJobWorker.destroy();其setApplicationContext方法主要是创建processorFactories,把springProcessorFactory、springMethodProcessorFactory添加到processorFactories,最后将processorFactories设置到config中
tech/powerjob/worker/PowerJobWorker.java
@Slf4j
public class PowerJobWorker {
private final RemoteEngine remoteEngine;
protected final WorkerRuntime workerRuntime;
private final AtomicBoolean initialized = new AtomicBoolean(false);
public PowerJobWorker(PowerJobWorkerConfig config) {
this.workerRuntime = new WorkerRuntime();
this.remoteEngine = new PowerJobRemoteEngine();
workerRuntime.setWorkerConfig(config);
}
//......
}
PowerJobWorker定义了remoteEngine、workerRuntime、initialized属性,构造方法将PowerJobWorkerConfig设置到workerRuntime中
public void init() throws Exception {
if (!initialized.compareAndSet(false, true)) {
log.warn("[PowerJobWorker] please do not repeat the initialization");
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[PowerJobWorker] start to initialize PowerJobWorker...");
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");
try {
PowerBannerPrinter.print();
// 校验 appName
if (!config.isEnableTestMode()) {
assertAppName();
} else {
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
}
// 初始化元数据
String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
workerRuntime.setWorkerAddress(workerAddress);
// 初始化 线程池
final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
workerRuntime.setExecutorManager(executorManager);
// 初始化 ProcessorLoader
ProcessorLoader processorLoader = buildProcessorLoader(workerRuntime);
workerRuntime.setProcessorLoader(processorLoader);
// 初始化 actor
TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);
ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);
WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor);
// 初始化通讯引擎
EngineConfig engineConfig = new EngineConfig()
.setType(config.getProtocol().name())
.setServerType(ServerType.WORKER)
.setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
.setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
EngineOutput engineOutput = remoteEngine.start(engineConfig);
workerRuntime.setTransporter(engineOutput.getTransporter());
// 连接 server
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor());
workerRuntime.setServerDiscoveryService(serverDiscoveryService);
log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
// 初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
workerRuntime.setOmsLogHandler(omsLogHandler);
// 初始化存储
TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());
taskPersistenceService.init();
workerRuntime.setTaskPersistenceService(taskPersistenceService);
log.info("[PowerJobWorker] local storage initialized successfully.");
// 初始化定时任务
workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, config.getHealthReportInterval(), TimeUnit.SECONDS);
workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);
log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch);
}catch (Exception e) {
log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e);
throw e;
}
}
init方法先通过PowerBannerPrinter.print()打印banner,对于非testMode会执行assertAppName校验,之后就是设置workerRuntime的workerAddress、executorManager、processorLoader、workerActor、processorTrackerActor,执行remoteEngine.start(engineConfig)、serverDiscoveryService.start、设置omsLogHandler、初始化taskPersistenceService、调度WorkerHealthReporter及logSubmitter
private void assertAppName() {
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
String url = "http://%s/server/assert?appName=%s";
for (String server : config.getServerAddress()) {
String realUrl = String.format(url, server, appName);
try {
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
if (resultDTO.isSuccess()) {
Long appId = Long.valueOf(resultDTO.getData().toString());
log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
workerRuntime.setAppId(appId);
return;
}else {
log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new PowerJobException(resultDTO.getMessage());
}
}catch (PowerJobException oe) {
throw oe;
}catch (Exception ignore) {
log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
throw new PowerJobException("no server available!");
}
assertAppName方法,主要是遍历server,调用
http://%s/server/assert?appName=%s
,根据appName或者appId,然后设置到workerRuntime,有一个成功则立即返回
private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) {
List<ProcessorFactory> customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList());
List<ProcessorFactory> finalPF = Lists.newArrayList(customPF);
// 后置添加2个系统 ProcessorLoader
finalPF.add(new BuiltInDefaultProcessorFactory());
finalPF.add(new JarContainerProcessorFactory(runtime));
return new PowerJobProcessorLoader(finalPF);
}
buildProcessorLoader在原来的processorFactoryList基础(
BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory
)上添加了BuiltInDefaultProcessorFactory、JarContainerProcessorFactory
public void destroy() throws Exception {
workerRuntime.getExecutorManager().shutdown();
remoteEngine.close();
}
close方法主要是执行workerRuntime.getExecutorManager().shutdown()及remoteEngine.close()
PowerJobAutoConfiguration主要是依据PowerJobProperties.Worker配置去创建PowerJobSpringWorker,而PowerJobSpringWorker则是将PowerJobWorker纳入到spring容器中,其setApplicationContext方法主要是将BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory添加到config的processorFactoryList;其init主要是校验appName、初始化线程池、ProcessorLoader、actor、remoteEngine、serverDiscoveryService、omsLogHandler、taskPersistenceService、调度WorkerHealthReporter及logSubmitter。