DataX迁移MongoDB

发布时间:2023年12月20日

DataX迁移MongoDB

源码修改

  • 目前版本中,在迁移MongoDB时,若列的类型为二进制,mongodbreader未做处理,源码src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java

    if (tempCol == null) {
        //continue; 这个不能直接continue会导致record到目的端错位
        record.addColumn(new StringColumn(null));
    }else if (tempCol instanceof Double) {
        //TODO deal with Double.isNaN()
        record.addColumn(new DoubleColumn((Double) tempCol));
    } else if (tempCol instanceof Boolean) {
        record.addColumn(new BoolColumn((Boolean) tempCol));
    } else if (tempCol instanceof Date) {
        record.addColumn(new DateColumn((Date) tempCol));
    } else if (tempCol instanceof Integer) {
        record.addColumn(new LongColumn((Integer) tempCol));
    }else if (tempCol instanceof Long) {
        record.addColumn(new LongColumn((Long) tempCol));
    } else {
        if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
            String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
            if(Strings.isNullOrEmpty(splitter)) {
                throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                    MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
            } else {
                ArrayList array = (ArrayList)tempCol;
                String tempArrayStr = Joiner.on(splitter).join(array);
                record.addColumn(new StringColumn(tempArrayStr));
            }
        } else {
            record.addColumn(new StringColumn(tempCol.toString()));
        }
    }
    
  • 修改为:

    if (tempCol == null) {
        //continue; 这个不能直接continue会导致record到目的端错位
        record.addColumn(new StringColumn(null));
    }else if (tempCol instanceof Double) {
        //TODO deal with Double.isNaN()
        record.addColumn(new DoubleColumn((Double) tempCol));
    } else if (tempCol instanceof Boolean) {
        record.addColumn(new BoolColumn((Boolean) tempCol));
    } else if (tempCol instanceof Date) {
        record.addColumn(new DateColumn((Date) tempCol));
    } else if (tempCol instanceof Integer) {
        record.addColumn(new LongColumn((Integer) tempCol));
    }else if (tempCol instanceof Long) {
        record.addColumn(new LongColumn((Long) tempCol));
    }  else if (tempCol instanceof Binary) {
        // 处理 MongoDB 的 Binary 类型数据
        Binary binaryData = (Binary) tempCol;
        byte[] binaryBytes = binaryData.getData();
        // 将字节数组添加到 DataX 中的二进制列
        record.addColumn(new BytesColumn(binaryBytes));
    } else {
        if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
            String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
            if(Strings.isNullOrEmpty(splitter)) {
                throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                    MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
            } else {
                ArrayList array = (ArrayList)tempCol;
                String tempArrayStr = Joiner.on(splitter).join(array);
                record.addColumn(new StringColumn(tempArrayStr));
            }
        } else {
            record.addColumn(new StringColumn(tempCol.toString()));
        }
    }
    
    • 修改源码后,要重新打包,由于只更改了mongodbreader,故在打包时,可以考虑将根

迁移脚本

  • 编写job脚本:1.json

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mongodbreader",
                        "parameter": {
                            "address": ["ip1:27017"],
                            "collectionName": "data",
                            "column": [
    							{
                                    "name": "_id",
                                    "type": "long"
                                },
                                {
                                    "name": "fileContent",
                                    "type": "bytes"
                                }
    						],
                            "dbName": "monitor",
                            "userName": "root",
                            "userPassword": "123456",
    						"query": {
    						  "_id": {
    							"$lt": 21
    						  }
    						}
                        }
                    },
                    "writer": {
                        "name": "mongodbwriter",
                        "parameter": {
                            "address": ["ip2:27017"],
                            "collectionName": "data",
                            "column": [
    							{
                                    "name": "_id",
                                    "type": "long"
                                },
                                {
                                    "name": "fileContent",
                                    "type": "bytes"
                                }
    						],
    						"writeMode": {
    						  "isReplace": "true",
    						  "replaceKey": "_id"
    						}
                            "dbName": "test",
    						"userName": "root",
                            "userPassword": "123456",
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": "2"
                }
            }
        }
    }
    
    • reader中的query节点为查询条件,上述demo中是查询_id小于21的记录。
  • 执行命令:

    python datax.py G:\Code\1.json
    
    • datax.py在打包后的target目录下,相对路径:target\datax\datax\bin
文章来源:https://blog.csdn.net/yudaxiaye/article/details/135076633
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。