本文主要研究一下PowerJob的OhMyClassLoader
tech/powerjob/worker/container/OhMyClassLoader.java
@Slf4j
public class OhMyClassLoader extends URLClassLoader {
public OhMyClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}
/**
* 主动加载类,否则类加载器内空空如也,Spring IOC容器初始化不到任何东西
* @param packageName 包路径,主动加载用户写的类
* @throws Exception 加载异常
*/
public void load(String packageName) throws Exception {
URL[] urLs = getURLs();
for (URL jarURL : urLs) {
JarFile jarFile = new JarFile(new File(jarURL.toURI()));
Enumeration<JarEntry> entries = jarFile.entries();
while (entries.hasMoreElements()) {
JarEntry jarEntry = entries.nextElement();
if (jarEntry.isDirectory()) {
continue;
}
String name = jarEntry.getName();
if (!name.endsWith(".class")) {
continue;
}
// 转换 org/spring/AAA.class -> org.spring.AAA
String tmp = name.substring(0, name.length() - 6);
String res = StringUtils.replace(tmp, "/", ".");
if (res.startsWith(packageName)) {
loadClass(res);
log.info("[OhMyClassLoader] load class({}) successfully.", res);
}
}
}
}
}
OhMyClassLoader继承了URLClassLoader,它定义了load方法,遍历urls,挨个根据url创建JarFile,然后遍历jarFile.entries(),找到.class结尾的entry,判断是否是packageName开头的,是则执行父类的loadClass方法
tech/powerjob/worker/container/OmsJarContainer.java
@Slf4j
public class OmsJarContainer implements OmsContainer {
private final Long containerId;
private final String name;
private final String version;
private final File localJarFile;
private final Long deployedTime;
// 引用计数器
private final AtomicInteger referenceCount = new AtomicInteger(0);
private OhMyClassLoader containerClassLoader;
private ClassPathXmlApplicationContext container;
private final Map<String, BasicProcessor> processorCache = Maps.newConcurrentMap();
public OmsJarContainer(Long containerId, String name, String version, File localJarFile) {
this.containerId = containerId;
this.name = name;
this.version = version;
this.localJarFile = localJarFile;
this.deployedTime = System.currentTimeMillis();
}
@Override
public void init() throws Exception {
log.info("[OmsJarContainer-{}] start to init container(name={},jarPath={})", containerId, name, localJarFile.getPath());
URL jarURL = localJarFile.toURI().toURL();
// 创建类加载器(父类加载为 Worker 的类加载)
this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());
// 解析 Properties
Properties properties = new Properties();
try (InputStream propertiesURLStream = containerClassLoader.getResourceAsStream(ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME)) {
if (propertiesURLStream == null) {
log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
}
properties.load(propertiesURLStream);
log.info("[OmsJarContainer-{}] load container properties successfully: {}", containerId, properties);
}
String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
if (StringUtils.isEmpty(packageName)) {
log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
throw new PowerJobException("invalid jar file");
}
// 加载用户类
containerClassLoader.load(packageName);
// 创建 Spring IOC 容器(Spring配置文件需要填相对路径)
// 需要切换线程上下文类加载器以加载 JDBC 类驱动(SPI)
ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(containerClassLoader);
try {
this.container = new ClassPathXmlApplicationContext(new String[]{ContainerConstant.SPRING_CONTEXT_FILE_NAME}, false);
this.container.setClassLoader(containerClassLoader);
this.container.refresh();
}finally {
Thread.currentThread().setContextClassLoader(oldCL);
}
log.info("[OmsJarContainer-{}] init container(name={},jarPath={}) successfully", containerId, name, localJarFile.getPath());
}
//......
}
OmsJarContainer的init方法会使用构造器传入的localJarFile作为url创建OhMyClassLoader,然后读取oms-worker-container.properties,以其中的PACKAGE_NAME属性作为packageName,然后使用containerClassLoader.load进行加载
tech/powerjob/worker/container/OmsContainerFactory.java
public static synchronized void deployContainer(ServerDeployContainerRequest request) {
Long containerId = request.getContainerId();
String containerName = request.getContainerName();
String version = request.getVersion();
log.info("[OmsContainer-{}] start to deploy container(name={},version={},downloadUrl={})", containerId, containerName, version, request.getDownloadURL());
OmsContainer oldContainer = CARGO.get(containerId);
if (oldContainer != null && version.equals(oldContainer.getVersion())) {
log.info("[OmsContainer-{}] version={} already deployed, so skip this deploy task.", containerId, version);
return;
}
String filePath = CONTAINER_DIR + containerId + "/" + version + ".jar";
// 下载Container到本地
File jarFile = new File(filePath);
try {
if (!jarFile.exists()) {
FileUtils.forceMkdirParent(jarFile);
FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath());
}
// 创建新容器
OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile);
newContainer.init();
// 替换容器
CARGO.put(containerId, newContainer);
log.info("[OmsContainer-{}] deployed new version:{} successfully!", containerId, version);
if (oldContainer != null) {
// 销毁旧容器
oldContainer.destroy();
}
} catch (Exception e) {
log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);
// 如果部署失败,则删除该 jar(本次失败可能是下载jar出错导致,不删除会导致这个版本永久无法重新部署)
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));
}
}
OmsContainerFactory提供了deployContainer方法,它会根据containerId以及version去下载指定的jar包,然后创建OmsJarContainer,执行init方法
OhMyClassLoader继承了URLClassLoader,它定义了load方法,遍历urls,挨个根据url创建JarFile,然后遍历jarFile.entries(),找到.class结尾的entry,判断是否是packageName开头的,是则执行父类的loadClass方法。不过这里貌似没有对JarFile进行关闭,可能会导致资源泄露。