书接上文,继续解决Kettle Local
运行错误问题
下面是spoon
的启动类的main
方法(对应源码下载可以参考PDI/Kettle-9源码下载及编译),其中主要是对kettle环境进行初始化、加载各种插件等
异步初始化:Future<KettleException>
对象用于异步执行初始化任务。executor.submit(new Callable<KettleException>() {...})
这行代码提交了一个 Callable
任务给线程池执行器,当该任务完成后,可以通过 Future
获取到结果(这里是可能抛出的 KettleException
)。
UI 插件对象类型注册:registerUIPluginObjectTypes()
方法负责注册与用户界面相关的插件对象类型,确保在 Spoon UI 中能够显示和处理这些类型的对象。(针对spoon图形界面,二开其实不需要)
设置客户端环境:KettleClientEnvironment.getInstance().setClient(KettleClientEnvironment.ClientType.SPOON);
这行代码将当前运行环境设置为 Spoon 客户端类型,即 PDI 的图形界面工具。
初始化 Kettle 环境:KettleEnvironment.init();
是整个 Kettle 初始化的关键步骤,它会加载所有核心库和插件,建立必要的运行环境。如果此步骤失败(例如因为某个数据库驱动插件未找到或加载失败),则会抛出 KettleException
。
使用的mysql
,报错oracle
,因为插件加载失败,默认是oracle
直接从data-integration
项目拷贝KettleStarterListener
(参照的就是Spoon.java
),如下:
package com.renxiaozhao.api;
import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* kettle启动监听器
*
*/
@Component
public class KettleStarterListener implements ServletContextListener {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KettleStarterListener.class);
@Override
public void contextInitialized(ServletContextEvent sce) {
System.setProperty("KETTLE_CONTEXT_PATH", sce.getServletContext().getContextPath());
/*
* The following lines are from Spoon.main
* because they are application-wide context.
*/
ExecutorService executor = Executors.newCachedThreadPool();
Future<KettleException> pluginRegistryFuture = executor.submit(new Callable<KettleException>() {
@Override
public KettleException call() throws Exception {
KettleClientEnvironment.getInstance().setClient(KettleClientEnvironment.ClientType.SPOON);
try {
logger.info("开始加载kettle环境信息,KettleEnvironment.init(false)");
KettleEnvironment.init(false);
logger.info("加载kettle环境信息完成。");
} catch (KettleException e) {
e.printStackTrace();
}
return null;
}
});
KettleException registryException;
try {
registryException = pluginRegistryFuture.get();
// 关闭线程池
executor.shutdown();
if (registryException != null) {
throw registryException;
}
} catch (Throwable t) {
t.printStackTrace();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
//shutdown
KettleEnvironment.shutdown();
}
}
2024-01-08 14:26:23.437 [qtp571696027-84] WARN org.eclipse.jetty.server.HttpChannel -
/renxiaozhao/kettle/run
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.pentaho.di.core.exception.KettleXMLException:
错误从XML文件读取转换
错误从XML文件读取转换
Unable to load database connection info from XML node
at java.lang.Thread.run (Thread.java:748)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run (QueuedThreadPool.java:683)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob (QueuedThreadPool.java:765)
at org.eclipse.jetty.io.ChannelEndPoint$2.run (ChannelEndPoint.java:118)
at org.eclipse.jetty.io.FillInterest.fillable (FillInterest.java:103)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded (AbstractConnection.java:305)
at org.eclipse.jetty.server.HttpConnection.onFillable (HttpConnection.java:260)
at org.eclipse.jetty.server.HttpChannel.handle (HttpChannel.java:364)
at org.eclipse.jetty.server.Server.handle (Server.java:502)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle (HandlerWrapper.java:132)
at org.eclipse.jetty.server.handler.ScopedHandler.handle (ScopedHandler.java:144)
at org.eclipse.jetty.server.handler.ContextHandler.doScope (ContextHandler.java:1247)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope (ScopedHandler.java:201)
at org.eclipse.jetty.server.session.SessionHandler.doScope (SessionHandler.java:1557)
at org.eclipse.jetty.servlet.ServletHandler.doScope (ServletHandler.java:480)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope (ScopedHandler.java:203)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle (ContextHandler.java:1345)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle (ScopedHandler.java:255)
at org.eclipse.jetty.server.session.SessionHandler.doHandle (SessionHandler.java:1588)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle (ScopedHandler.java:257)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle (HandlerWrapper.java:132)
at org.eclipse.jetty.security.SecurityHandler.handle (SecurityHandler.java:548)
at org.eclipse.jetty.server.handler.ScopedHandler.handle (ScopedHandler.java:146)
at org.eclipse.jetty.servlet.ServletHandler.doHandle (ServletHandler.java:540)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal (CharacterEncodingFilter.java:200)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal (HiddenHttpMethodFilter.java:93)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
at org.springframework.web.filter.FormContentFilter.doFilterInternal (FormContentFilter.java:92)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal (RequestContextFilter.java:99)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1623)
at org.eclipse.jetty.servlet.ServletHolder.handle (ServletHolder.java:867)
at javax.servlet.http.HttpServlet.service (HttpServlet.java:750)
at org.springframework.web.servlet.FrameworkServlet.service (FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service (HttpServlet.java:665)
at org.springframework.web.servlet.FrameworkServlet.doPost (FrameworkServlet.java:908)
at org.springframework.web.servlet.FrameworkServlet.processRequest (FrameworkServlet.java:1005)
at org.springframework.web.servlet.DispatcherServlet.doService (DispatcherServlet.java:942)
at org.springframework.web.servlet.DispatcherServlet.doDispatch (DispatcherServlet.java:1038)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle (AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal (RequestMappingHandlerAdapter.java:800)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod (RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle (ServletInvocableHandlerMethod.java:102)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest (InvocableHandlerMethod.java:138)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke (InvocableHandlerMethod.java:189)
at java.lang.reflect.Method.invoke (Method.java:498)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethodAccessorImpl.java:-2)
at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob (PdiUseDemoController.java:33)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$f64c592d.executeJob (<generated>:-1)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java:684)
at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java:218)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke (<generated>:-1)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob (PdiUseDemoServiceImpl.java:37)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute (PdiUseDemoServiceImpl.java:47)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.buildTransMeta (PdiUseDemoServiceImpl.java:102)
at org.pentaho.di.trans.TransMeta.loadXML (TransMeta.java:3030)
at org.pentaho.di.core.database.DatabaseMeta.<init> (DatabaseMeta.java:1009)
at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted (Encr.java:148)
...
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest (InvocableHandlerMethod.java:138)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke (InvocableHandlerMethod.java:189)
at java.lang.reflect.Method.invoke (Method.java:498)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethodAccessorImpl.java:-2)
at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob (PdiUseDemoController.java:33)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$f64c592d.executeJob (<generated>:-1)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java:684)
at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java:218)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke (<generated>:-1)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob (PdiUseDemoServiceImpl.java:37)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute (PdiUseDemoServiceImpl.java:47)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.buildTransMeta (PdiUseDemoServiceImpl.java:102)
at org.pentaho.di.trans.TransMeta.loadXML (TransMeta.java:3030)
at org.pentaho.di.core.database.DatabaseMeta.<init> (DatabaseMeta.java:1009)
at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted (Encr.java:148)
at org.pentaho.di.core.database.DatabaseMeta.<init>(DatabaseMeta.java:1030)
at org.pentaho.di.trans.TransMeta.loadXML(TransMeta.java:3030)
... 63 common frames omitted
Caused by: java.lang.NullPointerException: null
at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted(Encr.java:148)
at org.pentaho.di.core.database.DatabaseMeta.<init>(DatabaseMeta.java:1009)
... 64 common frames omitted
博主这里使用的是kettle9.2
,开源工具使用的是kettle8
,对比发现9.2
多了一个pentaho-encryption-support
插件,需要加载该插件
kettle-password-encoder-plugins.xml
配置文件,是对PDI/Kettle
中用于密码编码器(password encoder
)的插件进行定义和配置,确保文件位于类路径(classpath
)下,这样系统才能够在启动时识别并加载这些插件
<password-encoder-plugins>
<password-encoder-plugin id="Kettle">
<description>Kettle Password Encoder</description>
<classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname>
</password-encoder-plugin>
</password-encoder-plugins>
Request processing failed; nested exception is java.lang.NullPointerException
2024/01/08 15:32:19 - 123 - 转换已经从资源库预先载入.
2024-01-08 15:32:19.392 [qtp1772102816-79] INFO org.pentaho.di.trans.Trans -
[D:\tmp\test\test.xml] 转换已经从资源库预先载入.
2024/01/08 15:32:19 - 123 - 需要运行的步骤数 : 2 , 节点数 : 1
2024-01-08 15:32:19.393 [qtp1772102816-79] WARN org.eclipse.jetty.server.HttpChannel -
/renxiaozhao/kettle/run
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.NullPointerException
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1013)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:665)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:750)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:867)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1588)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1557)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:502)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:364)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: null
at org.pentaho.di.trans.Trans.startThreads(Trans.java:1329)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute(PdiUseDemoServiceImpl.java:80)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob(PdiUseDemoServiceImpl.java:37)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:684)
at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$3379b89f.executeJob(<generated>)
at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob(PdiUseDemoController.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:800)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
... 42 common frames omitted
开源项目中有的,以为没用,就没有加进来
package com.renxiaozhao.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.renxiaozhao.bean.entity.SportEntity;
import com.renxiaozhao.dao.mapper.SportMapper;
import com.renxiaozhao.service.inf.PdiUseDemoService;
import com.renxiaozhao.service.util.JSONLinkedObject;
import com.renxiaozhao.service.util.XML;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.exception.KettleMissingPluginsException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.logging.LoggingObjectType;
import org.pentaho.di.core.logging.SimpleLoggingObject;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransAdapter;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransMeta;
import org.springframework.stereotype.Service;
import org.w3c.dom.Document;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Service
public class PdiUseDemoServiceImpl extends ServiceImpl<SportMapper,SportEntity> implements PdiUseDemoService {
@Override
public void executeJob(String jobJson) throws Exception {
execute(jobJson);
}
@Override
public void executeJobByXml(String jobXml) throws Exception {
execute(jobXml);
}
private void execute(String jobJson) throws Exception {
// 构建TransMeta 对象
TransMeta transMeta = buildTransMeta(jobJson);
TransExecutionConfiguration executionConfiguration = new TransExecutionConfiguration();
// 设置默认值以便运行配置可以正确设置
executionConfiguration.setExecutingLocally(true);
executionConfiguration.setExecutingRemotely(false);
executionConfiguration.setExecutingClustered(false);
// 不启用安全模式
executionConfiguration.setSafeModeEnabled(true);
executionConfiguration.getUsedVariables(transMeta);
executionConfiguration.setLogLevel(LogLevel.DEBUG);
// 默认设置本地引擎执行
executionConfiguration.setRunConfiguration("Pentaho local");
//设置命令参数
executionConfiguration.setVariables(new HashMap<>());
// 创建trans
Trans trans = new Trans(transMeta);
String spoonLogObjectId = UUID.randomUUID().toString();
SimpleLoggingObject spoonLoggingObject = new SimpleLoggingObject(Thread.currentThread().getName() + "-" + Thread.currentThread().getId()
, LoggingObjectType.SPOON, null);
spoonLoggingObject.setContainerObjectId(spoonLogObjectId);
spoonLoggingObject.setLogLevel(executionConfiguration.getLogLevel());
trans.setParent(spoonLoggingObject);
trans.setLogLevel(executionConfiguration.getLogLevel());
trans.setReplayDate(executionConfiguration.getReplayDate());
trans.setRepository(executionConfiguration.getRepository());
trans.setMonitored(false);
if (trans != null) {
Map<String, String> arguments = executionConfiguration.getArguments();
final String[] args;
if (arguments != null) {
args = convertArguments(arguments);
} else {
args = null;
}
trans.getLogChannel().logBasic("正在启动项目");
trans.setSafeModeEnabled(executionConfiguration.isSafeModeEnabled());
trans.setGatheringMetrics(executionConfiguration.isGatheringMetrics());
// 预处理脚本
trans.prepareExecution(args);
}
// 启动转换
trans.addTransListener(new TransAdapter() {
@Override
public void transFinished(Trans trans) {
log.info("项目执行完成");
}
});
trans.startThreads();
}
private String[] convertArguments(Map<String, String> arguments) {
String[] argumentNames = arguments.keySet().toArray(new String[0]);
Arrays.sort(argumentNames);
String[] args = new String[argumentNames.length];
for (int i = 0; i < args.length; i++) {
String argumentName = argumentNames[i];
args[i] = arguments.get(argumentName);
}
return args;
}
public TransMeta buildTransMeta(String jobJson) throws IOException, KettleXMLException, KettleMissingPluginsException {
Document document;
//json转xml
if (!jobJson.startsWith("<?xml")) {
// json转xml
jobJson = StringEscapeUtils.unescapeXml(jobJson);
jobJson = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + XML.toString(new JSONLinkedObject(jobJson));
log.info("json转换成xml,转换后的xml:{}", jobJson);
}
// 写到临时目录
File outFile = new File("D:\\tmp\\test", "test.xml");
FileUtils.writeStringToFile(outFile, jobJson);
// 加载xml
document = XMLHandler.loadXMLString(jobJson);
TransMeta transMeta = new TransMeta();
transMeta.loadXML(
document.getDocumentElement(), outFile.getPath(), null, null, true, new Variables(),
(message, rememberText, rememberPropertyName) -> {
// Yes means: overwrite
return true;
});
if (transMeta.hasMissingPlugins()) {
log.info("【{}】缺少执行插件。", jobJson);
}
return transMeta;
}
}
src1
中数据成功同步到tgt
中
Spoon GUI
工具,这是 PDI
的图形界面设计工具,用户可以在这里创建、编辑和运行转换与作业。Pan
运行时环境,Pan
是一个命令行工具,用于执行已经设计好的转换(transformation
)。Kitchen
运行时环境,Kitchen
是另一个命令行工具,用于调度和执行作业(job
)。Carte
服务,这是一个轻量级的 web
应用服务器,使得用户可以通过 web
界面来管理和执行转换与作业,并提供远程调用接口。Pentaho Data Integration Server
或者现代版的 Pentaho Server
(可以参照Linux部署Kettle(pentaho-server-ce-9.4.0.0-343)记录/配置MySQL存储),这是一个完整的数据集成平台,提供了集中式的作业和转换管理、调度以及监控功能。Pentaho
的分布式处理组件 Scale
,它允许将大数据工作负载分布到多台机器上进行并行处理。方法逻辑很长,一看就是很重要的方法😂:
arguments
作为输入参数,这些参数通常是在命令行或通过程序调用时传递给转换的外部参数,然后将这些参数与转换内部定义的参数进行绑定。preparing
标志为 true
,表明当前正处于转换执行前的准备阶段。null
)startDate
变量,表示此次执行尚未开始。running
标志为 false
,因为此时转换尚未实际运行。hops
):
Step
),根据其配置信息创建并初始化对应的 StepMetaInterface
和 StepInterface
实例,并将其添加到内部数据结构中以便后续管理。hops
)正确无误,为执行过程中的数据流做好准备。log.snap(Metrics.METRIC_TRANSFORMATION_EXECUTION_START)
等日志操作,记录转换执行开始的时间点以及其他性能指标,以供监控和分析使用。/**
* Prepares the transformation for execution. This includes setting the arguments and parameters as well as preparing
* and tracking the steps and hops in the transformation.
*
* @param arguments the arguments to use for this transformation
* @throws KettleException in case the transformation could not be prepared (initialized)
*/
public void prepareExecution( String[] arguments ) throws KettleException {
setPreparing( true );
startDate = null;
setRunning( false );
log.snap( Metrics.METRIC_TRANSFORMATION_EXECUTION_START );
log.snap( Metrics.METRIC_TRANSFORMATION_INIT_START );
ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.TransformationPrepareExecution.id, this );
checkCompatibility();
// Set the arguments on the transformation...
//
if ( arguments != null ) {
setArguments( arguments );
}
activateParameters();
transMeta.activateParameters();
ConnectionUtil.init( transMeta );
if ( transMeta.getName() == null ) {
if ( transMeta.getFilename() != null ) {
log.logBasic( BaseMessages.getString( PKG, "Trans.Log.DispacthingStartedForFilename", transMeta
.getFilename() ) );
}
} else {
log.logBasic( BaseMessages.getString( PKG, "Trans.Log.DispacthingStartedForTransformation", transMeta
.getName() ) );
}
if ( getArguments() != null ) {
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.NumberOfArgumentsDetected", String.valueOf(
getArguments().length ) ) );
}
}
if ( isSafeModeEnabled() ) {
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.SafeModeIsEnabled", transMeta.getName() ) );
}
}
if ( getReplayDate() != null ) {
SimpleDateFormat df = new SimpleDateFormat( REPLAY_DATE_FORMAT );
log.logBasic( BaseMessages.getString( PKG, "Trans.Log.ThisIsAReplayTransformation" ) + df.format(
getReplayDate() ) );
} else {
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.ThisIsNotAReplayTransformation" ) );
}
}
// setInternalKettleVariables(this); --> Let's not do this, when running
// without file, for example remote, it spoils the fun
// extra check to see if the servlet print writer has some value in case
// folks want to test it locally...
//
if ( servletPrintWriter == null ) {
String encoding = System.getProperty( "KETTLE_DEFAULT_SERVLET_ENCODING", null );
if ( encoding == null ) {
servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out ) );
} else {
try {
servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out, encoding ) );
} catch ( UnsupportedEncodingException ex ) {
servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out ) );
}
}
}
// Keep track of all the row sets and allocated steps
//
steps = new ArrayList<>();
rowsets = new ArrayList<>();
List<StepMeta> hopsteps = transMeta.getTransHopSteps( false );
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.FoundDefferentSteps", String.valueOf( hopsteps
.size() ) ) );
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatingRowsets" ) );
}
// First allocate all the rowsets required!
// Note that a mapping doesn't receive ANY input or output rowsets...
//
for ( int i = 0; i < hopsteps.size(); i++ ) {
StepMeta thisStep = hopsteps.get( i );
if ( thisStep.isMapping() ) {
continue; // handled and allocated by the mapping step itself.
}
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocateingRowsetsForStep", String.valueOf( i ),
thisStep.getName() ) );
}
List<StepMeta> nextSteps = transMeta.findNextSteps( thisStep );
int nrTargets = nextSteps.size();
for ( int n = 0; n < nrTargets; n++ ) {
// What's the next step?
StepMeta nextStep = nextSteps.get( n );
if ( nextStep.isMapping() ) {
continue; // handled and allocated by the mapping step itself.
}
// How many times do we start the source step?
int thisCopies = thisStep.getCopies();
if ( thisCopies < 0 ) {
// This can only happen if a variable is used that didn't resolve to a positive integer value
//
throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.StepCopiesNotCorrectlyDefined", thisStep
.getName() ) );
}
// How many times do we start the target step?
int nextCopies = nextStep.getCopies();
// Are we re-partitioning?
boolean repartitioning;
if ( thisStep.isPartitioned() ) {
repartitioning = !thisStep.getStepPartitioningMeta().equals( nextStep.getStepPartitioningMeta() );
} else {
repartitioning = nextStep.isPartitioned();
}
int nrCopies;
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.copiesInfo", String.valueOf( thisCopies ), String
.valueOf( nextCopies ) ) );
}
int dispatchType;
if ( thisCopies == 1 && nextCopies == 1 ) {
dispatchType = TYPE_DISP_1_1;
nrCopies = 1;
} else if ( thisCopies == 1 && nextCopies > 1 ) {
dispatchType = TYPE_DISP_1_N;
nrCopies = nextCopies;
} else if ( thisCopies > 1 && nextCopies == 1 ) {
dispatchType = TYPE_DISP_N_1;
nrCopies = thisCopies;
} else if ( thisCopies == nextCopies && !repartitioning ) {
dispatchType = TYPE_DISP_N_N;
nrCopies = nextCopies;
} else {
// > 1!
dispatchType = TYPE_DISP_N_M;
nrCopies = nextCopies;
} // Allocate a rowset for each destination step
// Allocate the rowsets
//
if ( dispatchType != TYPE_DISP_N_M ) {
for ( int c = 0; c < nrCopies; c++ ) {
RowSet rowSet;
switch ( transMeta.getTransformationType() ) {
case Normal:
// This is a temporary patch until the batching rowset has proven
// to be working in all situations.
// Currently there are stalling problems when dealing with small
// amounts of rows.
//
Boolean batchingRowSet =
ValueMetaString.convertStringToBoolean( System.getProperty( Const.KETTLE_BATCHING_ROWSET ) );
if ( batchingRowSet != null && batchingRowSet.booleanValue() ) {
rowSet = new BlockingBatchingRowSet( transMeta.getSizeRowset() );
} else {
rowSet = new BlockingRowSet( transMeta.getSizeRowset() );
}
break;
case SerialSingleThreaded:
rowSet = new SingleRowRowSet();
break;
case SingleThreaded:
rowSet = new QueueRowSet();
break;
default:
throw new KettleException( "Unhandled transformation type: " + transMeta.getTransformationType() );
}
switch ( dispatchType ) {
case TYPE_DISP_1_1:
rowSet.setThreadNameFromToCopy( thisStep.getName(), 0, nextStep.getName(), 0 );
break;
case TYPE_DISP_1_N:
rowSet.setThreadNameFromToCopy( thisStep.getName(), 0, nextStep.getName(), c );
break;
case TYPE_DISP_N_1:
rowSet.setThreadNameFromToCopy( thisStep.getName(), c, nextStep.getName(), 0 );
break;
case TYPE_DISP_N_N:
rowSet.setThreadNameFromToCopy( thisStep.getName(), c, nextStep.getName(), c );
break;
default:
break;
}
rowsets.add( rowSet );
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.TransformationAllocatedNewRowset", rowSet
.toString() ) );
}
}
} else {
// For each N source steps we have M target steps
//
// From each input step we go to all output steps.
// This allows maximum flexibility for re-partitioning,
// distribution...
for ( int s = 0; s < thisCopies; s++ ) {
for ( int t = 0; t < nextCopies; t++ ) {
BlockingRowSet rowSet = new BlockingRowSet( transMeta.getSizeRowset() );
rowSet.setThreadNameFromToCopy( thisStep.getName(), s, nextStep.getName(), t );
rowsets.add( rowSet );
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.TransformationAllocatedNewRowset", rowSet
.toString() ) );
}
}
}
}
}
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatedRowsets", String.valueOf( rowsets.size() ),
String.valueOf( i ), thisStep.getName() ) + " " );
}
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatingStepsAndStepData" ) );
}
// Allocate the steps & the data...
//
for ( int i = 0; i < hopsteps.size(); i++ ) {
StepMeta stepMeta = hopsteps.get( i );
String stepid = stepMeta.getStepID();
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.TransformationIsToAllocateStep", stepMeta.getName(),
stepid ) );
}
// How many copies are launched of this step?
int nrCopies = stepMeta.getCopies();
if ( log.isDebug() ) {
log.logDebug( BaseMessages.getString( PKG, "Trans.Log.StepHasNumberRowCopies", String.valueOf( nrCopies ) ) );
}
// At least run once...
for ( int c = 0; c < nrCopies; c++ ) {
// Make sure we haven't started it yet!
if ( !hasStepStarted( stepMeta.getName(), c ) ) {
StepMetaDataCombi combi = new StepMetaDataCombi();
combi.stepname = stepMeta.getName();
combi.copy = c;
// The meta-data
combi.stepMeta = stepMeta;
combi.meta = stepMeta.getStepMetaInterface();
// Allocate the step data
StepDataInterface data = combi.meta.getStepData();
combi.data = data;
// Allocate the step
StepInterface step = combi.meta.getStep( stepMeta, data, c, transMeta, this );
// Copy the variables of the transformation to the step...
// don't share. Each copy of the step has its own variables.
//
step.initializeVariablesFrom( this );
step.setUsingThreadPriorityManagment( transMeta.isUsingThreadPriorityManagment() );
// Pass the connected repository & metaStore to the steps runtime
//
step.setRepository( repository );
step.setMetaStore( metaStore );
// If the step is partitioned, set the partitioning ID and some other
// things as well...
if ( stepMeta.isPartitioned() ) {
List<String> partitionIDs = stepMeta.getStepPartitioningMeta().getPartitionSchema().getPartitionIDs();
if ( partitionIDs != null && !partitionIDs.isEmpty() ) {
step.setPartitionID( partitionIDs.get( c ) ); // Pass the partition ID
// to the step
}
}
// Save the step too
combi.step = step;
// Pass logging level and metrics gathering down to the step level.
// /
if ( combi.step instanceof LoggingObjectInterface ) {
LogChannelInterface logChannel = combi.step.getLogChannel();
logChannel.setLogLevel( logLevel );
logChannel.setGatheringMetrics( log.isGatheringMetrics() );
}
// Add to the bunch...
steps.add( combi );
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.TransformationHasAllocatedANewStep", stepMeta
.getName(), String.valueOf( c ) ) );
}
}
}
}
// Now we need to verify if certain rowsets are not meant to be for error
// handling...
// Loop over the steps and for every step verify the output rowsets
// If a rowset is going to a target step in the steps error handling
// metadata, set it to the errorRowSet.
// The input rowsets are already in place, so the next step just accepts the
// rows.
// Metadata wise we need to do the same trick in TransMeta
//
for ( int s = 0; s < steps.size(); s++ ) {
StepMetaDataCombi combi = steps.get( s );
if ( combi.stepMeta.isDoingErrorHandling() ) {
combi.step.identifyErrorOutput();
}
}
// Now (optionally) write start log record!
// Make sure we synchronize appropriately to avoid duplicate batch IDs.
//
Object syncObject = this;
if ( parentJob != null ) {
syncObject = parentJob; // parallel execution in a job
}
if ( parentTrans != null ) {
syncObject = parentTrans; // multiple sub-transformations
}
synchronized ( syncObject ) {
calculateBatchIdAndDateRange();
beginProcessing();
}
// Set the partition-to-rowset mapping
//
for ( int i = 0; i < steps.size(); i++ ) {
StepMetaDataCombi sid = steps.get( i );
StepMeta stepMeta = sid.stepMeta;
StepInterface baseStep = sid.step;
baseStep.setPartitioned( stepMeta.isPartitioned() );
// Now let's take a look at the source and target relation
//
// If this source step is not partitioned, and the target step is: it
// means we need to re-partition the incoming data.
// If both steps are partitioned on the same method and schema, we don't
// need to re-partition
// If both steps are partitioned on a different method or schema, we need
// to re-partition as well.
// If both steps are not partitioned, we don't need to re-partition
//
boolean isThisPartitioned = stepMeta.isPartitioned();
PartitionSchema thisPartitionSchema = null;
if ( isThisPartitioned ) {
thisPartitionSchema = stepMeta.getStepPartitioningMeta().getPartitionSchema();
}
boolean isNextPartitioned = false;
StepPartitioningMeta nextStepPartitioningMeta = null;
PartitionSchema nextPartitionSchema = null;
List<StepMeta> nextSteps = transMeta.findNextSteps( stepMeta );
int nrNext = nextSteps.size();
for ( int p = 0; p < nrNext; p++ ) {
StepMeta nextStep = nextSteps.get( p );
if ( nextStep.isPartitioned() ) {
isNextPartitioned = true;
nextStepPartitioningMeta = nextStep.getStepPartitioningMeta();
nextPartitionSchema = nextStepPartitioningMeta.getPartitionSchema();
}
}
baseStep.setRepartitioning( StepPartitioningMeta.PARTITIONING_METHOD_NONE );
// If the next step is partitioned differently, set re-partitioning, when
// running locally.
//
if ( ( !isThisPartitioned && isNextPartitioned ) || ( isThisPartitioned && isNextPartitioned
&& !thisPartitionSchema.equals( nextPartitionSchema ) ) ) {
baseStep.setRepartitioning( nextStepPartitioningMeta.getMethodType() );
}
// For partitioning to a set of remove steps (repartitioning from a master
// to a set or remote output steps)
//
StepPartitioningMeta targetStepPartitioningMeta = baseStep.getStepMeta().getTargetStepPartitioningMeta();
if ( targetStepPartitioningMeta != null ) {
baseStep.setRepartitioning( targetStepPartitioningMeta.getMethodType() );
}
}
setPreparing( false );
setInitializing( true );
// Do a topology sort... Over 150 step (copies) things might be slowing down too much.
//
if ( isMonitored() && steps.size() < 150 ) {
doTopologySortOfSteps();
}
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.InitialisingSteps", String.valueOf( steps.size() ) ) );
}
StepInitThread[] initThreads = new StepInitThread[ steps.size() ];
Thread[] threads = new Thread[ steps.size() ];
// Initialize all the threads...
//
for ( int i = 0; i < steps.size(); i++ ) {
final StepMetaDataCombi sid = steps.get( i );
// Do the init code in the background!
// Init all steps at once, but ALL steps need to finish before we can
// continue properly!
//
initThreads[ i ] = new StepInitThread( sid, log );
// Put it in a separate thread!
//
threads[ i ] = new Thread( initThreads[ i ] );
threads[ i ].setName( "init of " + sid.stepname + "." + sid.copy + " (" + threads[ i ].getName() + ")" );
ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepBeforeInitialize.id, initThreads[ i ] );
threads[ i ].start();
}
for ( int i = 0; i < threads.length; i++ ) {
try {
threads[ i ].join();
ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepAfterInitialize.id, initThreads[ i ] );
} catch ( Exception ex ) {
log.logError( "Error with init thread: " + ex.getMessage(), ex.getMessage() );
log.logError( Const.getStackTracker( ex ) );
}
}
setInitializing( false );
boolean ok = true;
// All step are initialized now: see if there was one that didn't do it
// correctly!
//
for ( int i = 0; i < initThreads.length; i++ ) {
StepMetaDataCombi combi = initThreads[ i ].getCombi();
if ( !initThreads[ i ].isOk() ) {
log.logError( BaseMessages.getString( PKG, "Trans.Log.StepFailedToInit", combi.stepname + "." + combi.copy ) );
combi.data.setStatus( StepExecutionStatus.STATUS_STOPPED );
ok = false;
} else {
combi.data.setStatus( StepExecutionStatus.STATUS_IDLE );
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.StepInitialized", combi.stepname + "."
+ combi.copy ) );
}
}
}
if ( !ok ) {
// One or more steps failed on initialization.
// Transformation is now stopped.
setStopped( true );
// Halt the other threads as well, signal end-of-the line to the outside world...
// Also explicitly call dispose() to clean up resources opened during init();
//
for ( int i = 0; i < initThreads.length; i++ ) {
StepMetaDataCombi combi = initThreads[ i ].getCombi();
// Dispose will overwrite the status, but we set it back right after
// this.
combi.step.dispose( combi.meta, combi.data );
if ( initThreads[ i ].isOk() ) {
combi.data.setStatus( StepExecutionStatus.STATUS_HALTED );
} else {
combi.data.setStatus( StepExecutionStatus.STATUS_STOPPED );
}
}
// Just for safety, fire the trans finished listeners...
try {
fireTransFinishedListeners();
} catch ( KettleException e ) {
// listeners produces errors
log.logError( BaseMessages.getString( PKG, "Trans.FinishListeners.Exception" ) );
// we will not pass this exception up to prepareExecution() entry point.
} finally {
// Flag the transformation as finished even if exception was thrown
setFinished( true );
}
// Pass along the log during preview. Otherwise it becomes hard to see
// what went wrong.
//
if ( preview ) {
String logText = KettleLogStore.getAppender().getBuffer( getLogChannelId(), true ).toString();
throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.FailToInitializeAtLeastOneStep" ) + Const.CR
+ logText );
} else {
throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.FailToInitializeAtLeastOneStep" )
+ Const.CR );
}
}
log.snap( Metrics.METRIC_TRANSFORMATION_INIT_STOP );
KettleEnvironment.setExecutionInformation( this, repository );
setReadyToStart( true );
}