先在pom.xml和package.xml中保留所需的模块及transfomer模块
在datax-transformer模块中自定义所需的类并实现transformer接口
具体内容如下:
public class MysqlTransFormerTest extends Transformer {
public MysqlTransFormerTest(){
setTransformerName("dx_test_transfomer");
}
//这里的record就是读入的每一条数据
//paras是传入的参数,就是json配置中的parameter参数
@Override
public Record evaluate(Record record, Object... paras) {
//有几列
System.out.println(record.getColumnNumber());
//指定列的数据(占用大小,数据内容,数据类型)--{"byteSize":2,"rawData":"aa","type":5}
System.out.println(record.getColumn(0));
//给第0列数字+1并写入
Long aLong = record.getColumn(0).asLong();
long l = aLong.longValue();
record.setColumn(0,new LongColumn(l+1));
return record;
}
}
datax会在初始化时加载TransformerRegistry并将指定的Transformer进行注册。要将自定义的Transformer加入其注册的静态代码块中。
在datax-core模块的transport-transformer-TransformerRegistry中找到static代码块,加入:
static {
/**
* add native transformer
* local storage and from server will be delay load.
*/
registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
registTransformer(new DigestTransformer());
registTransformer(new MysqlTransFormerTest());
}
注意:一定要配置columnIndex,可以不用,但是在代码中是必须的
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "xxxxxx",
"column": [
"id",
"name"
],
"connection": [
{
"table": [
"tt1"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test"
]
}
]
}
},
"transformer": [
{
"name": "dx_test_transfomer",
"parameter": {
"columnIndex": 1
}
}
],
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "xxxxxx",
"column": [
"id",
"name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from tt2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=gbk",
"table": [
"tt2"
]
}
]
}
}
}
]
}
}