Datax数据处理自定义接口--Transformer的实现及使用

发布时间:2024年01月11日

1、自定Transformer

先在pom.xml和package.xml中保留所需的模块及transfomer模块
在datax-transformer模块中自定义所需的类并实现transformer接口
具体内容如下:

public class MysqlTransFormerTest extends Transformer {

    public MysqlTransFormerTest(){
        setTransformerName("dx_test_transfomer");
    }
	//这里的record就是读入的每一条数据
	//paras是传入的参数,就是json配置中的parameter参数
    @Override
    public Record evaluate(Record record, Object... paras) {
        //有几列
        System.out.println(record.getColumnNumber());
        //指定列的数据(占用大小,数据内容,数据类型)--{"byteSize":2,"rawData":"aa","type":5}
        System.out.println(record.getColumn(0));

        //给第0列数字+1并写入
        Long aLong = record.getColumn(0).asLong();
        long l = aLong.longValue();
        record.setColumn(0,new LongColumn(l+1));
        return record;
    }
}

2、注册自定义Transformer

datax会在初始化时加载TransformerRegistry并将指定的Transformer进行注册。要将自定义的Transformer加入其注册的静态代码块中。
在datax-core模块的transport-transformer-TransformerRegistry中找到static代码块,加入:

static {
    /**
     * add native transformer
     * local storage and from server will be delay load.
     */

    registTransformer(new SubstrTransformer());
    registTransformer(new PadTransformer());
    registTransformer(new ReplaceTransformer());
    registTransformer(new FilterTransformer());
    registTransformer(new GroovyTransformer());
    registTransformer(new DigestTransformer());
    registTransformer(new MysqlTransFormerTest());
}

3、json文件配置transformer

注意:一定要配置columnIndex,可以不用,但是在代码中是必须的

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "xxxxxx",
                        "column": [
                            "id",
                            "name"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "tt1"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/test"
                                ]
                            }
                        ]
                    }
                },
		"transformer": [
		{
		  "name": "dx_test_transfomer",
		  "parameter": {
			"columnIndex": 1
		  }
		}
	      ],
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "xxxxxx",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from tt2"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "tt2"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

4、启动程序或打包部署即可使用

文章来源:https://blog.csdn.net/T_Ghost/article/details/135457205
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。