Centos安装Datax

发布时间:2024年01月11日

一、DataX简介

????????DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。
源码地址点这里

二、DataX的数据源支持

????????DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图:

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
Kingbase
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储ODPS
ADB
ADS
OSS
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件datahub读 、写
SLS读 、写
图数据库阿里云 GDB
Neo4j
NoSQL数据存储OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Cassandra
数仓数据存储StarRocks读 、
ApacheDoris
ClickHouse
Databend
Hive
kudu
selectdb
无结构化数据存储TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库OpenTSDB
TSDB
TDengine

三、安装DataX

1、下载DataX

在源码中可以下载到DataX安装包:datax.tar.gz。
在这里插入图片描述

2、解压

tar -zxvf datax.tar.gz

在这里插入图片描述

3、检验是否安装成功

# 如下路径更换为自己的路径
python /wz_program/datax/bin/datax.py /wz_program/datax/job/job.json

在这里插入图片描述
出现如下内容则说明已安装成功:
在这里插入图片描述

4、使用

????????DataX使用只需要根据自己同步的数据的数据源与数据的目的地选择对应的Reader和Writer,将Reader和Writer信息配置到一个json文件中,然后执行同步命令即可完成数据同步。

四、实践案例

描述:将mysql数据库user_info表中的1500条数据同步到HDFS的/user_info中(HDFS需要提前安装好)。

1、环境信息

mysql信息如下:

CREATE TABLE `user_info` (
  `id` int NOT NULL AUTO_INCREMENT,
  `user_id` int NOT NULL,
  `username` varchar(255) NOT NULL,
  `email` varchar(255) NOT NULL,
  `phone_number` varchar(30) DEFAULT NULL,
  `status` enum('active','inactive') DEFAULT 'active',
  `score` int unsigned DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16280 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

在这里插入图片描述
HDFS信息如下:
在这里插入图片描述
特别注意:DataX向HDFS同步数据时,一定要保证目标路径已存在,否则会同步失败。
在这里插入图片描述

hadoop fs -mkdir /user_info

2、编写同步的配置文件(user_info.json)

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "user_id",
                            "username",
                            "email",
                            "phone_number",
                            "status",
                            "score"
                        ],
                        "where": "id>=3",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://*************:3306/hadoop"
                                ],
                                "table": [
                                    "user_info"
                                ]
                            }
                        ],
                        "password": "**********",
                        "splitPk": "",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "user_id",
                                "type": "bigint"
                            },
                            {
                                "name": "username",
                                "type": "string"
                            },
                            {
                                "name": "email",
                                "type": "string"
                            },
                            {
                                "name": "phone_number",
                                "type": "string"
                            },
                            {
                                "name": "status",
                                "type": "string"
                            },
                            {
                                "name": "score",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop001:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "user_info",
                        "fileType": "text",
                        "path": "/user_info",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

MySQLReader配置说明:

{
   "name": "mysqlreader",    #Reader的名称,固定写法,可以从官方文档中获取到(如上DataX的数据源支持中的表格)
   "parameter": {
       "column": [   #需要同步的字段,["*"]则表示所有列
           "id",
           "user_id",
           "username",
           "email",
           "phone_number",
           "status",
           "score"
       ],
       "where": "id>=3",   #where过滤条件,可以过滤掉不需要同步的数据
       "connection": [
           {
               "jdbcUrl": [
                   "jdbc:mysql://*************:3306/hadoop"   #需要同步的数据库url
               ],
               "table": [
                   "user_info"     #需要同步的数据库表名
               ]
           }
       ],
       "password": "**********",   #数据库密码
       "splitPk": "",     #分片字段,如果不指定则只会有单个Task
       "username": "root"  #数据库用户名
   }
}

HDFSWriter的配置说明:

{
    "name": "hdfswriter",  #Writer的名称,固定写法,可以从官方文档中获取到(如上DataX的数据源支持中的表格)
    "parameter": {
        "column": [  #列信息,包括列明和类型的设置
            {
                "name": "id",
                "type": "bigint"
            },
            {
                "name": "user_id",
                "type": "bigint"
            },
            {
                "name": "username",
                "type": "string"
            },
            {
                "name": "email",
                "type": "string"
            },
            {
                "name": "phone_number",
                "type": "string"
            },
            {
                "name": "status",
                "type": "string"
            },
            {
                "name": "score",
                "type": "string"
            }
        ],
        "compress": "gzip",   #HDFS压缩类型,text文件支持gzip和bzip2;orc文件支持NONE和SNAPPY
        "defaultFS": "hdfs://hadoop001:8020",   #HDFS文件系统namenode节点地址
        "fieldDelimiter": "\t",   #同步到HDFS文件字段的分隔符
        "fileName": "user_info",   #HDFS文件名前缀,如下图所示
        "fileType": "text",  #HDFS文件类型,目前支持text和orc
        "path": "/user_info",  #HDFS文件系统目标路径
        "writeMode": "append"  #数据写入模式(append:追加;nonConflict:若写入目录有同名文件【前缀相同】,则会报错)
    }
}

在这里插入图片描述

3、执行同步

python /wz_program/datax/bin/datax.py /wz_program/datax/job/user_info.json

在这里插入图片描述

4、验证同步结果

进入hdfs查询同步后的文件,前缀即为我们配置的名称,下载该文件查询同步的结果:
在这里插入图片描述

同步结果如下:
在这里插入图片描述

文章来源:https://blog.csdn.net/qq_26709459/article/details/135531332
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。