阿杰,我帮你整理一个 DataX 的全面介绍,涵盖 简介、部署、原理和使用方法,方便你快速理解和上手。


一、DataX 简介

  • DataX 是阿里巴巴开源的 离线数据同步工具,用于 异构数据源之间的高效数据同步
  • 支持多种数据源,包括:
    • 关系型数据库:MySQL、Oracle、SQL Server、PostgreSQL
    • 大数据系统:Hive、HDFS、HBase
    • NoSQL:MongoDB、Redis
    • 文件:CSV、TXT、Parquet 等
  • 特点
    1. 插件化设计:通过 Reader(读取)和 Writer(写入)插件,支持多种数据源组合。
    2. 易用性:配置 JSON 文件即可完成复杂数据同步任务。
    3. 高性能:支持多线程分片、增量同步。
    4. 开源免费:可根据业务需要进行二次开发。

二、DataX 部署

1. 环境要求

  • 操作系统:Linux / Windows / MacOS
  • JDK:Java 1.8+
  • Python:仅部分脚本需要 Python 环境(DataX 自带 Python 执行环境)

2. 下载与解压

  1. 从 GitHub 官方仓库下载: git clone https://github.com/alibaba/DataX.git
  2. 解压到目标目录,例如: /opt/datax

3. 配置环境

  • 配置 JAVA_HOMEexport JAVA_HOME=/usr/local/java export PATH=$JAVA_HOME/bin:$PATH
  • DataX 默认使用 Python 执行器,一般无需额外安装 Python。

4. 插件安装

  • DataX 内置多种 Reader/Writer 插件:
    • mysqlreader / mysqlwriter
    • hdfsreader / hdfswriter
    • oraclewriter / oraclereader
  • 直接在 DataX 解压目录 plugin/readerplugin/writer 查看。

三、DataX 工作原理

DataX 的工作原理可以分为以下步骤:

  1. 任务配置(JSON)
    • 用户定义一个 JSON 文件,包括:
      • Reader 插件及配置(源数据)
      • Writer 插件及配置(目标数据)
      • 同步字段、分片数量、并发线程等
  2. 启动任务
    • 使用 python ${DATAX_HOME}/bin/datax.py job.json 启动任务
  3. 任务拆分与并行执行
    • DataX 根据配置将任务拆分为多个 Channel(数据流通道)
    • Reader 将数据分片读出,Writer 并行写入
  4. 数据同步完成
    • 汇总任务执行情况
    • 输出日志和统计信息

DataX 架构图示意:

          +------------------+
          |      Job         |
          +------------------+
                  |
        +---------+---------+
        |                   |
  +-----v-----+       +-----v-----+
  |  Reader   |       |  Writer   |
  +-----+-----+       +-----+-----+
        |                   |
  +-----v-------------------v-----+
  |           Channel              |
  +--------------------------------+

四、DataX 使用方法

1. 配置 JSON 示例(MySQL → MySQL)

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": ["id","name","age"],
            "splitPk": "id",
            "connection": [
              {
                "table": ["source_table"],
                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": ["id","name","age"],
            "connection": [
              {
                "table": ["target_table"],
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db"
              }
            ]
          }
        }
      }
    ]
  }
}

2. 执行任务

python ${DATAX_HOME}/bin/datax.py job_mysql_to_mysql.json
  • 执行过程中会输出 每个 Channel 的读写进度
  • 任务完成后生成 日志文件,默认保存在 log 目录

3. 高级功能

  1. 增量同步
    • 利用 Reader 的 where 条件,只读取增量数据
    • Writer 支持 replaceinsertupdate 等模式
  2. 分片并发
    • JSON 中 speed.channel 设置通道数,提高吞吐量
  3. 异常处理
    • DataX 会自动重试失败 Channel
    • 日志中会显示失败原因,方便排查
  4. 支持 Hadoop/Hive/HBase
    • 可做 异构数据迁移,如 MySQL → HDFS → Hive

五、总结

  • DataX 核心定位:跨数据源的数据同步工具
  • 特点:开源、插件化、支持异构源、可并行、多线程
  • 使用流程
    1. 配置 JSON 文件(Reader/Writer/字段/并发)
    2. 启动任务 (datax.py job.json)
    3. 查看日志,监控进度
  • 适合场景
    • 数据仓库 ETL
    • 异构数据库迁移
    • 日志/指标数据汇总

明白阿杰,我帮你整理一个 DataX MySQL → Hive 的完整实战示例,包含 增量同步、分片设置、日志分析,让你可以直接上手。


一、前置条件

  1. 环境准备
    • MySQL 数据库(源数据库)
    • Hive/Hadoop 环境(目标数据库)
    • DataX 已部署,JAVA 环境配置正确
  2. 插件准备
    • mysqlreader
    • hdfswriter(写入 Hive 可用 HDFS,再建外部表)

二、示例场景

  • 源表user_info(MySQL)
    • 字段:id, name, age, update_time
  • 目标表user_info_hive(Hive)
    • 字段一致,外部表存储在 /user/hive/warehouse/user_info_hive/
  • 增量规则
    • update_time > '${last_sync_time}'
    • 每次同步后更新 ${last_sync_time}

三、DataX JSON 配置示例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 4  // 并行 4 个线程
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": ["id","name","age","update_time"],
            "splitPk": "id",
            "connection": [
              {
                "table": ["user_info"],
                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
                "where": "update_time > '${last_sync_time}'"
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://namenode:8020",
            "fileType": "text",
            "path": "/user/hive/warehouse/user_info_hive/",
            "fileName": "user_info",
            "writeMode": "append",
            "fieldDelimiter": "\t",
            "column": [
              {"name":"id","type":"string"},
              {"name":"name","type":"string"},
              {"name":"age","type":"string"},
              {"name":"update_time","type":"string"}
            ]
          }
        }
      }
    ]
  }
}

四、执行步骤

  1. 创建 Hive 外部表(第一次执行):
CREATE EXTERNAL TABLE user_info_hive(
    id STRING,
    name STRING,
    age STRING,
    update_time STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse/user_info_hive/';
  1. 执行 DataX 同步任务
python ${DATAX_HOME}/bin/datax.py mysql_to_hive_job.json
  1. 查看日志
  • DataX 会输出每个 Channel 的读写进度: Channel-1: read 1000 records, write 1000 records Channel-2: read 950 records, write 950 records ... Total: 3950 records
  • 日志文件在 ${DATAX_HOME}/log 下,可查找失败记录或错误信息。

五、增量同步策略

  1. 变量 ${last_sync_time}
    • 可通过脚本读取上次同步时间
    • 动态替换到 JSON 的 where 条件中
  2. 示例 Bash 更新策略
# 获取上次同步时间
LAST_TIME=$(cat /opt/datax/last_sync_time.txt)

# 替换 JSON 模板变量
sed "s/\${last_sync_time}/$LAST_TIME/g" mysql_to_hive_job_template.json > mysql_to_hive_job.json

# 执行 DataX
python ${DATAX_HOME}/bin/datax.py mysql_to_hive_job.json

# 同步完成后更新时间
date "+%Y-%m-%d %H:%M:%S" > /opt/datax/last_sync_time.txt

六、日志分析与监控

  1. 日志目录${DATAX_HOME}/log
  2. 关键字段
    • Total Records:总同步记录数
    • Channel-X Records:每个线程读写记录数
    • Error Record:错误记录数
  3. 异常处理
    • errorLimit.record:失败记录数超过该值任务失败
    • errorLimit.percentage:失败占比超过该值任务失败

七、总结

  • DataX MySQL → Hive 实战特点:
    1. 并行分片同步,提高效率
    2. 增量同步,减少全量开销
    3. 可通过日志追踪和监控,保证数据质量
    4. 外部表 + TextFile 形式,便于 Hive 查询