本文主要研究一下PowerJob的WorkerHealthReporter
tech/powerjob/worker/background/WorkerHealthReporter.java
@Slf4j
@RequiredArgsConstructor
public class WorkerHealthReporter implements Runnable {
private final WorkerRuntime workerRuntime;
@Override
public void run() {
// 没有可用Server,无法上报
String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
if (StringUtils.isEmpty(currentServer)) {
log.warn("[WorkerHealthReporter] no available server,fail to report health info!");
return;
}
SystemMetrics systemMetrics;
if (workerRuntime.getWorkerConfig().getSystemMetricsCollector() == null) {
systemMetrics = SystemInfoUtils.getSystemMetrics();
} else {
systemMetrics = workerRuntime.getWorkerConfig().getSystemMetricsCollector().collect();
}
WorkerHeartbeat heartbeat = new WorkerHeartbeat();
heartbeat.setSystemMetrics(systemMetrics);
heartbeat.setWorkerAddress(workerRuntime.getWorkerAddress());
heartbeat.setAppName(workerRuntime.getWorkerConfig().getAppName());
heartbeat.setAppId(workerRuntime.getAppId());
heartbeat.setHeartbeatTime(System.currentTimeMillis());
heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
heartbeat.setProtocol(workerRuntime.getWorkerConfig().getProtocol().name());
heartbeat.setClient("KingPenguin");
heartbeat.setTag(workerRuntime.getWorkerConfig().getTag());
// 上报 Tracker 数量
heartbeat.setLightTaskTrackerNum(LightTaskTrackerManager.currentTaskTrackerSize());
heartbeat.setHeavyTaskTrackerNum(HeavyTaskTrackerManager.currentTaskTrackerSize());
// 是否超载
if (workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() <= LightTaskTrackerManager.currentTaskTrackerSize() || workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum() <= HeavyTaskTrackerManager.currentTaskTrackerSize()){
heartbeat.setOverload(true);
}
// 获取当前加载的容器列表
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
// 发送请求
if (StringUtils.isEmpty(currentServer)) {
return;
}
// log
log.info("[WorkerHealthReporter] report health status,appId:{},appName:{},isOverload:{},maxLightweightTaskNum:{},currentLightweightTaskNum:{},maxHeavyweightTaskNum:{},currentHeavyweightTaskNum:{}" ,
heartbeat.getAppId(),
heartbeat.getAppName(),
heartbeat.isOverload(),
workerRuntime.getWorkerConfig().getMaxLightweightTaskNum(),
heartbeat.getLightTaskTrackerNum(),
workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(),
heartbeat.getHeavyTaskTrackerNum()
);
TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter());
}
}
WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报
tech/powerjob/worker/common/utils/TransportUtils.java
public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address);
transporter.tell(url, req);
}
public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
HandlerLocation handlerLocation = new HandlerLocation()
.setRootPath(rootPath)
.setMethodPath(handlerPath);
return new URL()
.setServerType(serverType)
.setAddress(Address.fromIpv4(address))
.setLocation(handlerLocation);
}
reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat
tech/powerjob/server/core/handler/AbWorkerRequestHandler.java
@Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
long startMs = System.currentTimeMillis();
WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
.setAppName(heartbeat.getAppName())
.setAppId(heartbeat.getAppId())
.setVersion(heartbeat.getVersion())
.setProtocol(heartbeat.getProtocol())
.setTag(heartbeat.getTag())
.setWorkerAddress(heartbeat.getWorkerAddress())
.setDelayMs(startMs - heartbeat.getHeartbeatTime())
.setScore(heartbeat.getSystemMetrics().getScore());
processWorkerHeartbeat0(heartbeat, event);
monitorService.monitor(event);
}
processWorkerHeartbeat方法将heartbeat转换为WorkerHeartbeatEvent,然后执行processWorkerHeartbeat0及monitorService.monitor(event)
tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
WorkerClusterManagerService.updateStatus(heartbeat);
}
processWorkerHeartbeat0通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态
tech/powerjob/server/remote/worker/WorkerClusterManagerService.java
public static void updateStatus(WorkerHeartbeat heartbeat) {
Long appId = heartbeat.getAppId();
String appName = heartbeat.getAppName();
ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
clusterStatusHolder.updateStatus(heartbeat);
}
updateStatus先获取appId对应的clusterStatusHolder,然后更新status
tech/powerjob/server/remote/worker/ClusterStatusHolder.java
public void updateStatus(WorkerHeartbeat heartbeat) {
String workerAddress = heartbeat.getWorkerAddress();
long heartbeatTime = heartbeat.getHeartbeatTime();
WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
WorkerInfo wf = new WorkerInfo();
wf.refresh(heartbeat);
return wf;
});
long oldTime = workerInfo.getLastActiveTime();
if (heartbeatTime < oldTime) {
log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
return;
}
workerInfo.refresh(heartbeat);
List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
if (!CollectionUtils.isEmpty(containerInfos)) {
containerInfos.forEach(containerInfo -> {
Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
infos.put(workerAddress, containerInfo);
});
}
}
ClusterStatusHolder的updateStatus方法先获取workerInfo,判断其heartbeatTime是否小于lastActiveTime,是则返回,否则执行workerInfo.refresh(heartbeat),最后更新一下heartbeat.getContainerInfos()
tech/powerjob/server/common/module/WorkerInfo.java
public void refresh(WorkerHeartbeat workerHeartbeat) {
address = workerHeartbeat.getWorkerAddress();
lastActiveTime = workerHeartbeat.getHeartbeatTime();
protocol = workerHeartbeat.getProtocol();
client = workerHeartbeat.getClient();
tag = workerHeartbeat.getTag();
systemMetrics = workerHeartbeat.getSystemMetrics();
containerInfos = workerHeartbeat.getContainerInfos();
lightTaskTrackerNum = workerHeartbeat.getLightTaskTrackerNum();
heavyTaskTrackerNum = workerHeartbeat.getHeavyTaskTrackerNum();
if (workerHeartbeat.isOverload()) {
overloading = true;
lastOverloadTime = workerHeartbeat.getHeartbeatTime();
log.warn("[WorkerInfo] worker {} is overload!", getAddress());
} else {
overloading = false;
}
}
WorkerInfo的refresh方法根据workerHeartbeat更新lastActiveTime及overloading等信息
tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java
@Slf4j
@Component
public class DisconnectedWorkerFilter implements WorkerFilter {
@Override
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
boolean timeout = workerInfo.timeout();
if (timeout) {
log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime());
}
return timeout;
}
}
DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout()
tech/powerjob/server/common/module/WorkerInfo.java
private static final long WORKER_TIMEOUT_MS = 60000;
public boolean timeout() {
long timeout = System.currentTimeMillis() - lastActiveTime;
return timeout > WORKER_TIMEOUT_MS;
}
timeout方法判断当前时间与lastActiveTime的时间差,之后与默认的WORKER_TIMEOUT_MS(
60s
)对比
tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
switch (dispatchStrategy) {
case RANDOM:
Collections.shuffle(workers);
break;
case HEALTH_FIRST:
workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
break;
default:
// do nothing
}
// 限定集群大小(0代表不限制)
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
workers = workers.subList(0, jobInfo.getMaxWorkerCount());
}
return workers;
}
private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
for (WorkerFilter filter : workerFilters) {
if (filter.filter(workerInfo, jobInfo)) {
return true;
}
}
return false;
}
getSuitableWorkers方法会remove掉filterWorker(workerInfo, jobInfo)为true的worker
PowerJob的WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报;reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat;服务端通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态,主要是执行WorkerInfo的refresh方法,它根据workerHeartbeat更新lastActiveTime及overloading等信息;而DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout(),它会将心跳超时的worker给排除掉。