09 DataX同步数据库表配置

发布时间:2023年12月18日

数据通道

全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向如下图所示。
在这里插入图片描述

DataX 配置文件

我们需要编写DataX 的json 配置文件,以activity_info为例,配置内容如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "activity_name",
                            "activity_type",
                            "activity_desc",
                            "start_time",
                            "end_time",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall"
                                ],
                                "table": [
                                    "activity_info"
                                ]
                            }
                        ],
                        "password": "<your mysql password>",
                        "splitPk": "",
                        "username": "<your mysql account>"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "activity_name",
                                "type": "string"
                            },
                            {
                                "name": "activity_type",
                                "type": "string"
                            },
                            {
                                "name": "activity_desc",
                                "type": "string"
                            },
                            {
                                "name": "start_time",
                                "type": "string"
                            },
                            {
                                "name": "end_time",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "activity_info",
                        "fileType": "text",
                        "path": "${targetdir}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

注:由于目标路径包含一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交任务时通过参数动态传入,参数名称为targetdir。

DataX 配置文件生成脚本

  1. 为了方便,提供DataX 配置批量生成脚本。在~/bin 目录下创建gen_import_config.py脚本。
# ecoding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"


def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))


def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)


def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

补充说明:

  • 需要安装Python Mysql驱动sudo yum install -y MySQL-python
  • 脚本使用说明
    python gen_import_config.py -d database -t table
    
    通过-d传入数据库名,-t传入表名,执行上述命令即可生成该表的DataX同步配置文件。
  1. 在~/bin目录下创建gen_import_config.sh脚本
#!/bin/bash

python ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info
  1. 为gen_import_config.sh脚本增加执行权限
chmod +x gen_import_config.sh
  1. 执行gen_import_config.sh脚本,生成配置文件[logan@hadoop101 bin]$ gen_import_config.sh
  2. 观察生成的配置文件ll /opt/module/datax/job/import/,总共15 个

测试Datax 配置文件

以activity_info为例,测试用脚本生成的配置文件是否可用。

  1. 创建目标路径。由于DataX同步任务要求目标路径提前存在,故需手动创建路径,当前activity_info表的目标路径应为/origin_data/gmall/db/activity_info_full/2023-06-14。
  2. 执行DataX同步命令。
    python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2023-06-14" /opt/module/datax/job/import/gmall.activity_info.json
    
  3. 观察 HDFS 对应路径上出现结果。

全量表数据同步脚本

为方便使用以及后续的任务调度,此处编写一个全量表数据同步脚本。

  1. 在~/bin目录创建mysql_to_hdfs_full.sh
    #!/bin/bash
    
    DATAX_HOME=/opt/module/datax
    
    # 如果传入日期则do_date等于传入的日期,否则等于前一天日期
    if [ -n "$2" ] ;then
        do_date=$2
    else
        do_date=`date -d "-1 day" +%F`
    fi
    
    #处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
    handle_targetdir() {
      hadoop fs -test -e $1
      if [[ $? -eq 1 ]]; then
        echo "路径$1不存在,正在创建......"
        hadoop fs -mkdir -p $1
      else
        echo "路径$1已经存在"
        fs_count=$(hadoop fs -count $1)
        content_size=$(echo $fs_count | awk '{print $3}')
        if [[ $content_size -eq 0 ]]; then
          echo "路径$1为空"
        else
          echo "路径$1不为空,正在清空......"
          hadoop fs -rm -r -f $1/*
        fi
      fi
    }
    
    #数据同步
    import_data() {
      datax_config=$1
      target_dir=$2
    
      handle_targetdir $target_dir
      python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
    }
    
    case $1 in
    "activity_info")
      import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
      ;;
    "activity_rule")
      import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
      ;;
    "base_category1")
      import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
      ;;
    "base_category2")
      import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
      ;;
    "base_category3")
      import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
      ;;
    "base_dic")
      import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
      ;;
    "base_province")
      import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
      ;;
    "base_region")
      import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
      ;;
    "base_trademark")
      import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
      ;;
    "cart_info")
      import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
      ;;
    "coupon_info")
      import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
      ;;
    "sku_attr_value")
      import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
      ;;
    "sku_info")
      import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
      ;;
    "sku_sale_attr_value")
      import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
      ;;
    "spu_info")
      import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
      ;;
    "all")
      import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
      import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
      import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
      import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
      import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
      import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
      import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
      import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
      import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
      import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
      import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
      import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
      import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
      import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
      import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
      ;;
    esac
    
    1. 为mysql_to_hdfs_full.sh增加执行权限。chmod +x ~/bin/mysql_to_hdfs_full.sh
    2. 测试同步脚本。mysql_to_hdfs_full.sh all 2023-06-14
    3. 检查同步结果。查看HDFS目表路径是否出现全量表数据,全量表共15张。
文章来源:https://blog.csdn.net/qq_41758289/article/details/135047986
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。