目前版本中,在迁移MongoDB时,若列的类型为二进制,mongodbreader未做处理,源码src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java
if (tempCol == null) {
//continue; 这个不能直接continue会导致record到目的端错位
record.addColumn(new StringColumn(null));
}else if (tempCol instanceof Double) {
//TODO deal with Double.isNaN()
record.addColumn(new DoubleColumn((Double) tempCol));
} else if (tempCol instanceof Boolean) {
record.addColumn(new BoolColumn((Boolean) tempCol));
} else if (tempCol instanceof Date) {
record.addColumn(new DateColumn((Date) tempCol));
} else if (tempCol instanceof Integer) {
record.addColumn(new LongColumn((Integer) tempCol));
}else if (tempCol instanceof Long) {
record.addColumn(new LongColumn((Long) tempCol));
} else {
if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
if(Strings.isNullOrEmpty(splitter)) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
} else {
ArrayList array = (ArrayList)tempCol;
String tempArrayStr = Joiner.on(splitter).join(array);
record.addColumn(new StringColumn(tempArrayStr));
}
} else {
record.addColumn(new StringColumn(tempCol.toString()));
}
}
修改为:
if (tempCol == null) {
//continue; 这个不能直接continue会导致record到目的端错位
record.addColumn(new StringColumn(null));
}else if (tempCol instanceof Double) {
//TODO deal with Double.isNaN()
record.addColumn(new DoubleColumn((Double) tempCol));
} else if (tempCol instanceof Boolean) {
record.addColumn(new BoolColumn((Boolean) tempCol));
} else if (tempCol instanceof Date) {
record.addColumn(new DateColumn((Date) tempCol));
} else if (tempCol instanceof Integer) {
record.addColumn(new LongColumn((Integer) tempCol));
}else if (tempCol instanceof Long) {
record.addColumn(new LongColumn((Long) tempCol));
} else if (tempCol instanceof Binary) {
// 处理 MongoDB 的 Binary 类型数据
Binary binaryData = (Binary) tempCol;
byte[] binaryBytes = binaryData.getData();
// 将字节数组添加到 DataX 中的二进制列
record.addColumn(new BytesColumn(binaryBytes));
} else {
if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
if(Strings.isNullOrEmpty(splitter)) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
} else {
ArrayList array = (ArrayList)tempCol;
String tempArrayStr = Joiner.on(splitter).join(array);
record.addColumn(new StringColumn(tempArrayStr));
}
} else {
record.addColumn(new StringColumn(tempCol.toString()));
}
}
编写job脚本:1.json
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["ip1:27017"],
"collectionName": "data",
"column": [
{
"name": "_id",
"type": "long"
},
{
"name": "fileContent",
"type": "bytes"
}
],
"dbName": "monitor",
"userName": "root",
"userPassword": "123456",
"query": {
"_id": {
"$lt": 21
}
}
}
},
"writer": {
"name": "mongodbwriter",
"parameter": {
"address": ["ip2:27017"],
"collectionName": "data",
"column": [
{
"name": "_id",
"type": "long"
},
{
"name": "fileContent",
"type": "bytes"
}
],
"writeMode": {
"isReplace": "true",
"replaceKey": "_id"
}
"dbName": "test",
"userName": "root",
"userPassword": "123456",
}
}
}
],
"setting": {
"speed": {
"channel": "2"
}
}
}
}
reader
中的query
节点为查询条件,上述demo中是查询_id
小于21的记录。执行命令:
python datax.py G:\Code\1.json
datax.py
在打包后的target目录下,相对路径:target\datax\datax\bin