Kettle Local引擎使用记录(二):问题记录及解决方法

发布时间:2024年01月08日

🔼上一集:Kettle Local引擎使用记录(一)

📚 前言

书接上文,继续解决Kettle Local运行错误问题

??问题记录

?问题一:Database type not found!…database type with plugin id [Oracle] couldn’t be found!

?原因:没有初始化Kettle环境

下面是spoon的启动类的main方法(对应源码下载可以参考PDI/Kettle-9源码下载及编译),其中主要是对kettle环境进行初始化、加载各种插件等
在这里插入图片描述

  • 异步初始化Future<KettleException> 对象用于异步执行初始化任务。executor.submit(new Callable<KettleException>() {...}) 这行代码提交了一个 Callable 任务给线程池执行器,当该任务完成后,可以通过 Future 获取到结果(这里是可能抛出的 KettleException)。

  • UI 插件对象类型注册registerUIPluginObjectTypes() 方法负责注册与用户界面相关的插件对象类型,确保在 Spoon UI 中能够显示和处理这些类型的对象。(针对spoon图形界面,二开其实不需要)

  • 设置客户端环境KettleClientEnvironment.getInstance().setClient(KettleClientEnvironment.ClientType.SPOON); 这行代码将当前运行环境设置为 Spoon 客户端类型,即 PDI 的图形界面工具。

  • 初始化 Kettle 环境KettleEnvironment.init(); 是整个 Kettle 初始化的关键步骤,它会加载所有核心库和插件,建立必要的运行环境。如果此步骤失败(例如因为某个数据库驱动插件未找到或加载失败),则会抛出 KettleException

使用的mysql,报错oracle,因为插件加载失败,默认是oracle
在这里插入图片描述

?解决:添加监听器,进行Kettle环境初始化

直接从data-integration项目拷贝KettleStarterListener (参照的就是Spoon.java),如下:

package com.renxiaozhao.api;

import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * kettle启动监听器
 *
 */
@Component
public class KettleStarterListener implements ServletContextListener {
    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KettleStarterListener.class);


    @Override
    public void contextInitialized(ServletContextEvent sce) {
        System.setProperty("KETTLE_CONTEXT_PATH", sce.getServletContext().getContextPath());
        /*
         *  The following lines are from Spoon.main
         *  because they are application-wide context.
         */
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<KettleException> pluginRegistryFuture = executor.submit(new Callable<KettleException>() {

            @Override
            public KettleException call() throws Exception {
                KettleClientEnvironment.getInstance().setClient(KettleClientEnvironment.ClientType.SPOON);
                try {
                    logger.info("开始加载kettle环境信息,KettleEnvironment.init(false)");
                    KettleEnvironment.init(false);
                    logger.info("加载kettle环境信息完成。");
                } catch (KettleException e) {
                    e.printStackTrace();
                }

                return null;
            }
        });
        KettleException registryException;
        try {
            registryException = pluginRegistryFuture.get();
            // 关闭线程池
            executor.shutdown();

            if (registryException != null) {
                throw registryException;
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }

    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        //shutdown
        KettleEnvironment.shutdown();
    }
}

?问题二:加解密插件加载失败空指针:Encr.decryptPasswordOptionallyEncrypted( XMLHandler.getTagValue( con, “password” ) )

在这里插入图片描述

2024-01-08 14:26:23.437 [qtp571696027-84] WARN  org.eclipse.jetty.server.HttpChannel -
				/renxiaozhao/kettle/run
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.pentaho.di.core.exception.KettleXMLException: 
错误从XML文件读取转换

错误从XML文件读取转换

Unable to load database connection info from XML node
 at java.lang.Thread.run (Thread.java:748)
 at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run (QueuedThreadPool.java:683)
 at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob (QueuedThreadPool.java:765)
 at org.eclipse.jetty.io.ChannelEndPoint$2.run (ChannelEndPoint.java:118)
 at org.eclipse.jetty.io.FillInterest.fillable (FillInterest.java:103)
 at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded (AbstractConnection.java:305)
 at org.eclipse.jetty.server.HttpConnection.onFillable (HttpConnection.java:260)
 at org.eclipse.jetty.server.HttpChannel.handle (HttpChannel.java:364)
 at org.eclipse.jetty.server.Server.handle (Server.java:502)
 at org.eclipse.jetty.server.handler.HandlerWrapper.handle (HandlerWrapper.java:132)
 at org.eclipse.jetty.server.handler.ScopedHandler.handle (ScopedHandler.java:144)
 at org.eclipse.jetty.server.handler.ContextHandler.doScope (ContextHandler.java:1247)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextScope (ScopedHandler.java:201)
 at org.eclipse.jetty.server.session.SessionHandler.doScope (SessionHandler.java:1557)
 at org.eclipse.jetty.servlet.ServletHandler.doScope (ServletHandler.java:480)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextScope (ScopedHandler.java:203)
 at org.eclipse.jetty.server.handler.ContextHandler.doHandle (ContextHandler.java:1345)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle (ScopedHandler.java:255)
 at org.eclipse.jetty.server.session.SessionHandler.doHandle (SessionHandler.java:1588)
 at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle (ScopedHandler.java:257)
 at org.eclipse.jetty.server.handler.HandlerWrapper.handle (HandlerWrapper.java:132)
 at org.eclipse.jetty.security.SecurityHandler.handle (SecurityHandler.java:548)
 at org.eclipse.jetty.server.handler.ScopedHandler.handle (ScopedHandler.java:146)
 at org.eclipse.jetty.servlet.ServletHandler.doHandle (ServletHandler.java:540)
 at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
 at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
 at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal (CharacterEncodingFilter.java:200)
 at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
 at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
 at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal (HiddenHttpMethodFilter.java:93)
 at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
 at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
 at org.springframework.web.filter.FormContentFilter.doFilterInternal (FormContentFilter.java:92)
 at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1610)
 at org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java:107)
 at org.springframework.web.filter.RequestContextFilter.doFilterInternal (RequestContextFilter.java:99)
 at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter (ServletHandler.java:1623)
 at org.eclipse.jetty.servlet.ServletHolder.handle (ServletHolder.java:867)
 at javax.servlet.http.HttpServlet.service (HttpServlet.java:750)
 at org.springframework.web.servlet.FrameworkServlet.service (FrameworkServlet.java:882)
 at javax.servlet.http.HttpServlet.service (HttpServlet.java:665)
 at org.springframework.web.servlet.FrameworkServlet.doPost (FrameworkServlet.java:908)
 at org.springframework.web.servlet.FrameworkServlet.processRequest (FrameworkServlet.java:1005)
 at org.springframework.web.servlet.DispatcherServlet.doService (DispatcherServlet.java:942)
 at org.springframework.web.servlet.DispatcherServlet.doDispatch (DispatcherServlet.java:1038)
 at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle (AbstractHandlerMethodAdapter.java:87)
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal (RequestMappingHandlerAdapter.java:800)
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod (RequestMappingHandlerAdapter.java:895)
 at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle (ServletInvocableHandlerMethod.java:102)
 at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest (InvocableHandlerMethod.java:138)
 at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke (InvocableHandlerMethod.java:189)
 at java.lang.reflect.Method.invoke (Method.java:498)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
 at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
 at sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethodAccessorImpl.java:-2)
 at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob (PdiUseDemoController.java:33)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$f64c592d.executeJob (<generated>:-1)
 at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java:684)
 at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java:218)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke (<generated>:-1)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob (PdiUseDemoServiceImpl.java:37)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute (PdiUseDemoServiceImpl.java:47)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.buildTransMeta (PdiUseDemoServiceImpl.java:102)
 at org.pentaho.di.trans.TransMeta.loadXML (TransMeta.java:3030)
 at org.pentaho.di.core.database.DatabaseMeta.<init> (DatabaseMeta.java:1009)
 at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted (Encr.java:148)

   ...
 at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest (InvocableHandlerMethod.java:138)
 at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke (InvocableHandlerMethod.java:189)
 at java.lang.reflect.Method.invoke (Method.java:498)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
 at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
 at sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethodAccessorImpl.java:-2)
 at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob (PdiUseDemoController.java:33)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$f64c592d.executeJob (<generated>:-1)
 at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept (CglibAopProxy.java:684)
 at org.springframework.cglib.proxy.MethodProxy.invoke (MethodProxy.java:218)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke (<generated>:-1)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob (PdiUseDemoServiceImpl.java:37)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute (PdiUseDemoServiceImpl.java:47)
 at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.buildTransMeta (PdiUseDemoServiceImpl.java:102)
 at org.pentaho.di.trans.TransMeta.loadXML (TransMeta.java:3030)
 at org.pentaho.di.core.database.DatabaseMeta.<init> (DatabaseMeta.java:1009)
 at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted (Encr.java:148)

	at org.pentaho.di.core.database.DatabaseMeta.<init>(DatabaseMeta.java:1030)
	at org.pentaho.di.trans.TransMeta.loadXML(TransMeta.java:3030)
	... 63 common frames omitted
Caused by: java.lang.NullPointerException: null
	at org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted(Encr.java:148)
	at org.pentaho.di.core.database.DatabaseMeta.<init>(DatabaseMeta.java:1009)
	... 64 common frames omitted

?原因:加解密插件没有成功加载

博主这里使用的是kettle9.2,开源工具使用的是kettle8,对比发现9.2多了一个pentaho-encryption-support插件,需要加载该插件
在这里插入图片描述
在这里插入图片描述

?解决:添加kettle-password-encoder-plugins.xml

kettle-password-encoder-plugins.xml配置文件,是对PDI/Kettle中用于密码编码器(password encoder)的插件进行定义和配置,确保文件位于类路径(classpath)下,这样系统才能够在启动时识别并加载这些插件
在这里插入图片描述

<password-encoder-plugins>

  <password-encoder-plugin id="Kettle">
    <description>Kettle Password Encoder</description>
    <classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname>
  </password-encoder-plugin>
 
</password-encoder-plugins>

?问题三:steps空指针Request processing failed; nested exception is java.lang.NullPointerException

在这里插入图片描述

2024/01/08 15:32:19 - 123 - 转换已经从资源库预先载入.
2024-01-08 15:32:19.392 [qtp1772102816-79] INFO  org.pentaho.di.trans.Trans -
				[D:\tmp\test\test.xml]  转换已经从资源库预先载入.
2024/01/08 15:32:19 - 123 - 需要运行的步骤数 : 2  , 节点数 : 1
2024-01-08 15:32:19.393 [qtp1772102816-79] WARN  org.eclipse.jetty.server.HttpChannel -
				/renxiaozhao/kettle/run
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.NullPointerException
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1013)
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:665)
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:750)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:867)
	at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
	at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
	at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
	at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
	at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
	at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146)
	at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1588)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1557)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
	at org.eclipse.jetty.server.Server.handle(Server.java:502)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:364)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
	at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: null
	at org.pentaho.di.trans.Trans.startThreads(Trans.java:1329)
	at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.execute(PdiUseDemoServiceImpl.java:80)
	at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl.executeJob(PdiUseDemoServiceImpl.java:37)
	at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$FastClassBySpringCGLIB$$56121f70.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:684)
	at com.renxiaozhao.service.impl.PdiUseDemoServiceImpl$$EnhancerBySpringCGLIB$$3379b89f.executeJob(<generated>)
	at com.renxiaozhao.api.controller.PdiUseDemoController.executeJob(PdiUseDemoController.java:33)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:800)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
	... 42 common frames omitted

?原因:缺少预处理代码

![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/3158bbfd7e834c31898fcf4f1cf6a11d.png)

?解决:添加预处理代码

开源项目中有的,以为没用,就没有加进来
在这里插入图片描述

package com.renxiaozhao.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.renxiaozhao.bean.entity.SportEntity;
import com.renxiaozhao.dao.mapper.SportMapper;
import com.renxiaozhao.service.inf.PdiUseDemoService;
import com.renxiaozhao.service.util.JSONLinkedObject;
import com.renxiaozhao.service.util.XML;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.exception.KettleMissingPluginsException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.logging.LoggingObjectType;
import org.pentaho.di.core.logging.SimpleLoggingObject;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransAdapter;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransMeta;
import org.springframework.stereotype.Service;
import org.w3c.dom.Document;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Slf4j
@Service
public class PdiUseDemoServiceImpl extends ServiceImpl<SportMapper,SportEntity> implements PdiUseDemoService {

    @Override
    public void executeJob(String jobJson) throws Exception {
        execute(jobJson);
    }

    @Override
    public void executeJobByXml(String jobXml) throws Exception {
        execute(jobXml);
    }

    private void execute(String jobJson) throws Exception {
        // 构建TransMeta 对象
        TransMeta transMeta = buildTransMeta(jobJson);
        TransExecutionConfiguration executionConfiguration = new TransExecutionConfiguration();
        // 设置默认值以便运行配置可以正确设置
        executionConfiguration.setExecutingLocally(true);
        executionConfiguration.setExecutingRemotely(false);
        executionConfiguration.setExecutingClustered(false);
        // 不启用安全模式
        executionConfiguration.setSafeModeEnabled(true);
        executionConfiguration.getUsedVariables(transMeta);
        executionConfiguration.setLogLevel(LogLevel.DEBUG);
        // 默认设置本地引擎执行
        executionConfiguration.setRunConfiguration("Pentaho local");
        //设置命令参数
        executionConfiguration.setVariables(new HashMap<>());
        //  创建trans
        Trans trans = new Trans(transMeta);
        String spoonLogObjectId = UUID.randomUUID().toString();
        SimpleLoggingObject spoonLoggingObject = new SimpleLoggingObject(Thread.currentThread().getName() + "-" + Thread.currentThread().getId()
                , LoggingObjectType.SPOON, null);
        spoonLoggingObject.setContainerObjectId(spoonLogObjectId);
        spoonLoggingObject.setLogLevel(executionConfiguration.getLogLevel());
        trans.setParent(spoonLoggingObject);
        trans.setLogLevel(executionConfiguration.getLogLevel());
        trans.setReplayDate(executionConfiguration.getReplayDate());
        trans.setRepository(executionConfiguration.getRepository());
        trans.setMonitored(false);
        if (trans != null) {
            Map<String, String> arguments = executionConfiguration.getArguments();
            final String[] args;
            if (arguments != null) {
                args = convertArguments(arguments);
            } else {
                args = null;
            }

            trans.getLogChannel().logBasic("正在启动项目");

            trans.setSafeModeEnabled(executionConfiguration.isSafeModeEnabled());
            trans.setGatheringMetrics(executionConfiguration.isGatheringMetrics());

            // 预处理脚本
            trans.prepareExecution(args);

        }
        // 启动转换
        trans.addTransListener(new TransAdapter() {
            @Override
            public void transFinished(Trans trans) {
                log.info("项目执行完成");
            }
        });
        trans.startThreads();

    }
    private String[] convertArguments(Map<String, String> arguments) {
        String[] argumentNames = arguments.keySet().toArray(new String[0]);
        Arrays.sort(argumentNames);

        String[] args = new String[argumentNames.length];
        for (int i = 0; i < args.length; i++) {
            String argumentName = argumentNames[i];
            args[i] = arguments.get(argumentName);
        }
        return args;
    }
    public TransMeta buildTransMeta(String jobJson) throws IOException, KettleXMLException, KettleMissingPluginsException {
        Document document;
        //json转xml
        if (!jobJson.startsWith("<?xml")) {
            // json转xml
            jobJson = StringEscapeUtils.unescapeXml(jobJson);
            jobJson = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + XML.toString(new JSONLinkedObject(jobJson));

            log.info("json转换成xml,转换后的xml:{}", jobJson);
        }

        // 写到临时目录
        File outFile = new File("D:\\tmp\\test", "test.xml");
        FileUtils.writeStringToFile(outFile, jobJson);

        // 加载xml
        document = XMLHandler.loadXMLString(jobJson);

        TransMeta transMeta = new TransMeta();
        transMeta.loadXML(
                document.getDocumentElement(), outFile.getPath(), null, null, true, new Variables(),
                (message, rememberText, rememberPropertyName) -> {
                    // Yes means: overwrite
                    return true;
                });

        if (transMeta.hasMissingPlugins()) {
            log.info("【{}】缺少执行插件。", jobJson);
        }

        return transMeta;
    }
}

📚成功运行

在这里插入图片描述
在这里插入图片描述
src1中数据成功同步到tgt
在这里插入图片描述

📚其它

📗Kettle客户端类型

在这里插入图片描述

  • SPOON:代表 Spoon GUI 工具,这是 PDI 的图形界面设计工具,用户可以在这里创建、编辑和运行转换与作业。
  • PAN:表示 Pan 运行时环境,Pan 是一个命令行工具,用于执行已经设计好的转换(transformation)。
  • KITCHEN:代表 Kitchen 运行时环境,Kitchen 是另一个命令行工具,用于调度和执行作业(job)。
  • CARTE:指的是 Carte 服务,这是一个轻量级的 web 应用服务器,使得用户可以通过 web 界面来管理和执行转换与作业,并提供远程调用接口。
  • DI_SERVER:代表 Pentaho Data Integration Server 或者现代版的 Pentaho Server(可以参照Linux部署Kettle(pentaho-server-ce-9.4.0.0-343)记录/配置MySQL存储),这是一个完整的数据集成平台,提供了集中式的作业和转换管理、调度以及监控功能。
  • SCALE:在某些版本或特定上下文中可能指的是 Pentaho 的分布式处理组件 Scale,它允许将大数据工作负载分布到多台机器上进行并行处理。
  • OTHER:其他未知或未明确指定的客户端类型,可能用于扩展或者适应未来可能出现的新应用场景。

📗Kettle执行前准备转换方法prepareExecution

方法逻辑很长,一看就是很重要的方法😂:

  • 设置参数和变量:接受一个字符串数组 arguments 作为输入参数,这些参数通常是在命令行或通过程序调用时传递给转换的外部参数,然后将这些参数与转换内部定义的参数进行绑定。
  • 初始化状态标志
    • 设置 preparing 标志为 true,表明当前正处于转换执行前的准备阶段。
    • 清空(或设置为 nullstartDate 变量,表示此次执行尚未开始。
    • 设置 running 标志为 false,因为此时转换尚未实际运行。
  • 准备步骤和跳转(hops
    • 针对转换中的每个步骤(Step),根据其配置信息创建并初始化对应的 StepMetaInterfaceStepInterface 实例,并将其添加到内部数据结构中以便后续管理。
    • 检查并确保转换内的所有步骤间跳转关系(hops)正确无误,为执行过程中的数据流做好准备。
  • 日志记录:调用 log.snap(Metrics.METRIC_TRANSFORMATION_EXECUTION_START) 等日志操作,记录转换执行开始的时间点以及其他性能指标,以供监控和分析使用。
/**
   * Prepares the transformation for execution. This includes setting the arguments and parameters as well as preparing
   * and tracking the steps and hops in the transformation.
   *
   * @param arguments the arguments to use for this transformation
   * @throws KettleException in case the transformation could not be prepared (initialized)
   */
public void prepareExecution( String[] arguments ) throws KettleException {
    setPreparing( true );
    startDate = null;
    setRunning( false );

    log.snap( Metrics.METRIC_TRANSFORMATION_EXECUTION_START );
    log.snap( Metrics.METRIC_TRANSFORMATION_INIT_START );

    ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.TransformationPrepareExecution.id, this );

    checkCompatibility();

    // Set the arguments on the transformation...
    //
    if ( arguments != null ) {
      setArguments( arguments );
    }

    activateParameters();
    transMeta.activateParameters();
    ConnectionUtil.init( transMeta );

    if ( transMeta.getName() == null ) {
      if ( transMeta.getFilename() != null ) {
        log.logBasic( BaseMessages.getString( PKG, "Trans.Log.DispacthingStartedForFilename", transMeta
          .getFilename() ) );
      }
    } else {
      log.logBasic( BaseMessages.getString( PKG, "Trans.Log.DispacthingStartedForTransformation", transMeta
        .getName() ) );
    }

    if ( getArguments() != null ) {
      if ( log.isDetailed() ) {
        log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.NumberOfArgumentsDetected", String.valueOf(
          getArguments().length ) ) );
      }
    }

    if ( isSafeModeEnabled() ) {
      if ( log.isDetailed() ) {
        log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.SafeModeIsEnabled", transMeta.getName() ) );
      }
    }

    if ( getReplayDate() != null ) {
      SimpleDateFormat df = new SimpleDateFormat( REPLAY_DATE_FORMAT );
      log.logBasic( BaseMessages.getString( PKG, "Trans.Log.ThisIsAReplayTransformation" ) + df.format(
        getReplayDate() ) );
    } else {
      if ( log.isDetailed() ) {
        log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.ThisIsNotAReplayTransformation" ) );
      }
    }

    // setInternalKettleVariables(this); --> Let's not do this, when running
    // without file, for example remote, it spoils the fun

    // extra check to see if the servlet print writer has some value in case
    // folks want to test it locally...
    //
    if ( servletPrintWriter == null ) {
      String encoding = System.getProperty( "KETTLE_DEFAULT_SERVLET_ENCODING", null );
      if ( encoding == null ) {
        servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out ) );
      } else {
        try {
          servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out, encoding ) );
        } catch ( UnsupportedEncodingException ex ) {
          servletPrintWriter = new PrintWriter( new OutputStreamWriter( System.out ) );
        }
      }
    }

    // Keep track of all the row sets and allocated steps
    //
    steps = new ArrayList<>();
    rowsets = new ArrayList<>();

    List<StepMeta> hopsteps = transMeta.getTransHopSteps( false );

    if ( log.isDetailed() ) {
      log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.FoundDefferentSteps", String.valueOf( hopsteps
        .size() ) ) );
      log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatingRowsets" ) );
    }
    // First allocate all the rowsets required!
    // Note that a mapping doesn't receive ANY input or output rowsets...
    //
    for ( int i = 0; i < hopsteps.size(); i++ ) {
      StepMeta thisStep = hopsteps.get( i );
      if ( thisStep.isMapping() ) {
        continue; // handled and allocated by the mapping step itself.
      }

      if ( log.isDetailed() ) {
        log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocateingRowsetsForStep", String.valueOf( i ),
          thisStep.getName() ) );
      }

      List<StepMeta> nextSteps = transMeta.findNextSteps( thisStep );
      int nrTargets = nextSteps.size();

      for ( int n = 0; n < nrTargets; n++ ) {
        // What's the next step?
        StepMeta nextStep = nextSteps.get( n );
        if ( nextStep.isMapping() ) {
          continue; // handled and allocated by the mapping step itself.
        }

        // How many times do we start the source step?
        int thisCopies = thisStep.getCopies();

        if ( thisCopies < 0 ) {
          // This can only happen if a variable is used that didn't resolve to a positive integer value
          //
          throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.StepCopiesNotCorrectlyDefined", thisStep
            .getName() ) );
        }

        // How many times do we start the target step?
        int nextCopies = nextStep.getCopies();

        // Are we re-partitioning?
        boolean repartitioning;
        if ( thisStep.isPartitioned() ) {
          repartitioning = !thisStep.getStepPartitioningMeta().equals( nextStep.getStepPartitioningMeta() );
        } else {
          repartitioning = nextStep.isPartitioned();
        }

        int nrCopies;
        if ( log.isDetailed() ) {
          log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.copiesInfo", String.valueOf( thisCopies ), String
            .valueOf( nextCopies ) ) );
        }
        int dispatchType;
        if ( thisCopies == 1 && nextCopies == 1 ) {
          dispatchType = TYPE_DISP_1_1;
          nrCopies = 1;
        } else if ( thisCopies == 1 && nextCopies > 1 ) {
          dispatchType = TYPE_DISP_1_N;
          nrCopies = nextCopies;
        } else if ( thisCopies > 1 && nextCopies == 1 ) {
          dispatchType = TYPE_DISP_N_1;
          nrCopies = thisCopies;
        } else if ( thisCopies == nextCopies && !repartitioning ) {
          dispatchType = TYPE_DISP_N_N;
          nrCopies = nextCopies;
        } else {
          // > 1!
          dispatchType = TYPE_DISP_N_M;
          nrCopies = nextCopies;
        } // Allocate a rowset for each destination step

        // Allocate the rowsets
        //
        if ( dispatchType != TYPE_DISP_N_M ) {
          for ( int c = 0; c < nrCopies; c++ ) {
            RowSet rowSet;
            switch ( transMeta.getTransformationType() ) {
              case Normal:
                // This is a temporary patch until the batching rowset has proven
                // to be working in all situations.
                // Currently there are stalling problems when dealing with small
                // amounts of rows.
                //
                Boolean batchingRowSet =
                  ValueMetaString.convertStringToBoolean( System.getProperty( Const.KETTLE_BATCHING_ROWSET ) );
                if ( batchingRowSet != null && batchingRowSet.booleanValue() ) {
                  rowSet = new BlockingBatchingRowSet( transMeta.getSizeRowset() );
                } else {
                  rowSet = new BlockingRowSet( transMeta.getSizeRowset() );
                }
                break;

              case SerialSingleThreaded:
                rowSet = new SingleRowRowSet();
                break;

              case SingleThreaded:
                rowSet = new QueueRowSet();
                break;

              default:
                throw new KettleException( "Unhandled transformation type: " + transMeta.getTransformationType() );
            }

            switch ( dispatchType ) {
              case TYPE_DISP_1_1:
                rowSet.setThreadNameFromToCopy( thisStep.getName(), 0, nextStep.getName(), 0 );
                break;
              case TYPE_DISP_1_N:
                rowSet.setThreadNameFromToCopy( thisStep.getName(), 0, nextStep.getName(), c );
                break;
              case TYPE_DISP_N_1:
                rowSet.setThreadNameFromToCopy( thisStep.getName(), c, nextStep.getName(), 0 );
                break;
              case TYPE_DISP_N_N:
                rowSet.setThreadNameFromToCopy( thisStep.getName(), c, nextStep.getName(), c );
                break;
              default:
                break;
            }
            rowsets.add( rowSet );
            if ( log.isDetailed() ) {
              log.logDetailed( BaseMessages.getString( PKG, "Trans.TransformationAllocatedNewRowset", rowSet
                .toString() ) );
            }
          }
        } else {
          // For each N source steps we have M target steps
          //
          // From each input step we go to all output steps.
          // This allows maximum flexibility for re-partitioning,
          // distribution...
          for ( int s = 0; s < thisCopies; s++ ) {
            for ( int t = 0; t < nextCopies; t++ ) {
              BlockingRowSet rowSet = new BlockingRowSet( transMeta.getSizeRowset() );
              rowSet.setThreadNameFromToCopy( thisStep.getName(), s, nextStep.getName(), t );
              rowsets.add( rowSet );
              if ( log.isDetailed() ) {
                log.logDetailed( BaseMessages.getString( PKG, "Trans.TransformationAllocatedNewRowset", rowSet
                  .toString() ) );
              }
            }
          }
        }
      }
      log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatedRowsets", String.valueOf( rowsets.size() ),
        String.valueOf( i ), thisStep.getName() ) + " " );
    }

    if ( log.isDetailed() ) {
      log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.AllocatingStepsAndStepData" ) );
    }

    // Allocate the steps & the data...
    //
    for ( int i = 0; i < hopsteps.size(); i++ ) {
      StepMeta stepMeta = hopsteps.get( i );
      String stepid = stepMeta.getStepID();

      if ( log.isDetailed() ) {
        log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.TransformationIsToAllocateStep", stepMeta.getName(),
          stepid ) );
      }

      // How many copies are launched of this step?
      int nrCopies = stepMeta.getCopies();

      if ( log.isDebug() ) {
        log.logDebug( BaseMessages.getString( PKG, "Trans.Log.StepHasNumberRowCopies", String.valueOf( nrCopies ) ) );
      }

      // At least run once...
      for ( int c = 0; c < nrCopies; c++ ) {
        // Make sure we haven't started it yet!
        if ( !hasStepStarted( stepMeta.getName(), c ) ) {
          StepMetaDataCombi combi = new StepMetaDataCombi();

          combi.stepname = stepMeta.getName();
          combi.copy = c;

          // The meta-data
          combi.stepMeta = stepMeta;
          combi.meta = stepMeta.getStepMetaInterface();

          // Allocate the step data
          StepDataInterface data = combi.meta.getStepData();
          combi.data = data;

          // Allocate the step
          StepInterface step = combi.meta.getStep( stepMeta, data, c, transMeta, this );

          // Copy the variables of the transformation to the step...
          // don't share. Each copy of the step has its own variables.
          //
          step.initializeVariablesFrom( this );
          step.setUsingThreadPriorityManagment( transMeta.isUsingThreadPriorityManagment() );

          // Pass the connected repository & metaStore to the steps runtime
          //
          step.setRepository( repository );
          step.setMetaStore( metaStore );

          // If the step is partitioned, set the partitioning ID and some other
          // things as well...
          if ( stepMeta.isPartitioned() ) {
            List<String> partitionIDs = stepMeta.getStepPartitioningMeta().getPartitionSchema().getPartitionIDs();
            if ( partitionIDs != null && !partitionIDs.isEmpty() ) {
              step.setPartitionID( partitionIDs.get( c ) ); // Pass the partition ID
              // to the step
            }
          }

          // Save the step too
          combi.step = step;

          // Pass logging level and metrics gathering down to the step level.
          // /
          if ( combi.step instanceof LoggingObjectInterface ) {
            LogChannelInterface logChannel = combi.step.getLogChannel();
            logChannel.setLogLevel( logLevel );
            logChannel.setGatheringMetrics( log.isGatheringMetrics() );
          }

          // Add to the bunch...
          steps.add( combi );

          if ( log.isDetailed() ) {
            log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.TransformationHasAllocatedANewStep", stepMeta
              .getName(), String.valueOf( c ) ) );
          }
        }
      }
    }

    // Now we need to verify if certain rowsets are not meant to be for error
    // handling...
    // Loop over the steps and for every step verify the output rowsets
    // If a rowset is going to a target step in the steps error handling
    // metadata, set it to the errorRowSet.
    // The input rowsets are already in place, so the next step just accepts the
    // rows.
    // Metadata wise we need to do the same trick in TransMeta
    //
    for ( int s = 0; s < steps.size(); s++ ) {
      StepMetaDataCombi combi = steps.get( s );
      if ( combi.stepMeta.isDoingErrorHandling() ) {
        combi.step.identifyErrorOutput();

      }
    }

    // Now (optionally) write start log record!
    // Make sure we synchronize appropriately to avoid duplicate batch IDs.
    //
    Object syncObject = this;
    if ( parentJob != null ) {
      syncObject = parentJob; // parallel execution in a job
    }
    if ( parentTrans != null ) {
      syncObject = parentTrans; // multiple sub-transformations
    }
    synchronized ( syncObject ) {
      calculateBatchIdAndDateRange();
      beginProcessing();
    }

    // Set the partition-to-rowset mapping
    //
    for ( int i = 0; i < steps.size(); i++ ) {
      StepMetaDataCombi sid = steps.get( i );

      StepMeta stepMeta = sid.stepMeta;
      StepInterface baseStep = sid.step;

      baseStep.setPartitioned( stepMeta.isPartitioned() );

      // Now let's take a look at the source and target relation
      //
      // If this source step is not partitioned, and the target step is: it
      // means we need to re-partition the incoming data.
      // If both steps are partitioned on the same method and schema, we don't
      // need to re-partition
      // If both steps are partitioned on a different method or schema, we need
      // to re-partition as well.
      // If both steps are not partitioned, we don't need to re-partition
      //
      boolean isThisPartitioned = stepMeta.isPartitioned();
      PartitionSchema thisPartitionSchema = null;
      if ( isThisPartitioned ) {
        thisPartitionSchema = stepMeta.getStepPartitioningMeta().getPartitionSchema();
      }

      boolean isNextPartitioned = false;
      StepPartitioningMeta nextStepPartitioningMeta = null;
      PartitionSchema nextPartitionSchema = null;

      List<StepMeta> nextSteps = transMeta.findNextSteps( stepMeta );
      int nrNext = nextSteps.size();
      for ( int p = 0; p < nrNext; p++ ) {
        StepMeta nextStep = nextSteps.get( p );
        if ( nextStep.isPartitioned() ) {
          isNextPartitioned = true;
          nextStepPartitioningMeta = nextStep.getStepPartitioningMeta();
          nextPartitionSchema = nextStepPartitioningMeta.getPartitionSchema();
        }
      }

      baseStep.setRepartitioning( StepPartitioningMeta.PARTITIONING_METHOD_NONE );

      // If the next step is partitioned differently, set re-partitioning, when
      // running locally.
      //
      if ( ( !isThisPartitioned && isNextPartitioned ) || ( isThisPartitioned && isNextPartitioned
        && !thisPartitionSchema.equals( nextPartitionSchema ) ) ) {
        baseStep.setRepartitioning( nextStepPartitioningMeta.getMethodType() );
      }

      // For partitioning to a set of remove steps (repartitioning from a master
      // to a set or remote output steps)
      //
      StepPartitioningMeta targetStepPartitioningMeta = baseStep.getStepMeta().getTargetStepPartitioningMeta();
      if ( targetStepPartitioningMeta != null ) {
        baseStep.setRepartitioning( targetStepPartitioningMeta.getMethodType() );
      }
    }

    setPreparing( false );
    setInitializing( true );

    // Do a topology sort... Over 150 step (copies) things might be slowing down too much.
    //
    if ( isMonitored() && steps.size() < 150 ) {
      doTopologySortOfSteps();
    }

    if ( log.isDetailed() ) {
      log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.InitialisingSteps", String.valueOf( steps.size() ) ) );
    }

    StepInitThread[] initThreads = new StepInitThread[ steps.size() ];
    Thread[] threads = new Thread[ steps.size() ];

    // Initialize all the threads...
    //
    for ( int i = 0; i < steps.size(); i++ ) {
      final StepMetaDataCombi sid = steps.get( i );

      // Do the init code in the background!
      // Init all steps at once, but ALL steps need to finish before we can
      // continue properly!
      //
      initThreads[ i ] = new StepInitThread( sid, log );

      // Put it in a separate thread!
      //
      threads[ i ] = new Thread( initThreads[ i ] );
      threads[ i ].setName( "init of " + sid.stepname + "." + sid.copy + " (" + threads[ i ].getName() + ")" );

      ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepBeforeInitialize.id, initThreads[ i ] );

      threads[ i ].start();
    }

    for ( int i = 0; i < threads.length; i++ ) {
      try {
        threads[ i ].join();
        ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.StepAfterInitialize.id, initThreads[ i ] );
      } catch ( Exception ex ) {
        log.logError( "Error with init thread: " + ex.getMessage(), ex.getMessage() );
        log.logError( Const.getStackTracker( ex ) );
      }
    }

    setInitializing( false );
    boolean ok = true;

    // All step are initialized now: see if there was one that didn't do it
    // correctly!
    //
    for ( int i = 0; i < initThreads.length; i++ ) {
      StepMetaDataCombi combi = initThreads[ i ].getCombi();
      if ( !initThreads[ i ].isOk() ) {
        log.logError( BaseMessages.getString( PKG, "Trans.Log.StepFailedToInit", combi.stepname + "." + combi.copy ) );
        combi.data.setStatus( StepExecutionStatus.STATUS_STOPPED );
        ok = false;
      } else {
        combi.data.setStatus( StepExecutionStatus.STATUS_IDLE );
        if ( log.isDetailed() ) {
          log.logDetailed( BaseMessages.getString( PKG, "Trans.Log.StepInitialized", combi.stepname + "."
            + combi.copy ) );
        }
      }
    }

    if ( !ok ) {
      // One or more steps failed on initialization.
      // Transformation is now stopped.
      setStopped( true );

      // Halt the other threads as well, signal end-of-the line to the outside world...
      // Also explicitly call dispose() to clean up resources opened during init();
      //
      for ( int i = 0; i < initThreads.length; i++ ) {
        StepMetaDataCombi combi = initThreads[ i ].getCombi();

        // Dispose will overwrite the status, but we set it back right after
        // this.
        combi.step.dispose( combi.meta, combi.data );

        if ( initThreads[ i ].isOk() ) {
          combi.data.setStatus( StepExecutionStatus.STATUS_HALTED );
        } else {
          combi.data.setStatus( StepExecutionStatus.STATUS_STOPPED );
        }
      }

      // Just for safety, fire the trans finished listeners...
      try {
        fireTransFinishedListeners();
      } catch ( KettleException e ) {
        // listeners produces errors
        log.logError( BaseMessages.getString( PKG, "Trans.FinishListeners.Exception" ) );
        // we will not pass this exception up to prepareExecution() entry point.
      } finally {
        // Flag the transformation as finished even if exception was thrown
        setFinished( true );
      }

      // Pass along the log during preview. Otherwise it becomes hard to see
      // what went wrong.
      //
      if ( preview ) {
        String logText = KettleLogStore.getAppender().getBuffer( getLogChannelId(), true ).toString();
        throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.FailToInitializeAtLeastOneStep" ) + Const.CR
          + logText );
      } else {
        throw new KettleException( BaseMessages.getString( PKG, "Trans.Log.FailToInitializeAtLeastOneStep" )
          + Const.CR );
      }
    }

    log.snap( Metrics.METRIC_TRANSFORMATION_INIT_STOP );

    KettleEnvironment.setExecutionInformation( this, repository );

    setReadyToStart( true );
  }
文章来源:https://blog.csdn.net/qq_36434219/article/details/135456390
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。