阿杰,我帮你整理一个 DataX 的全面介绍,涵盖 简介、部署、原理和使用方法,方便你快速理解和上手。
一、DataX 简介
- DataX 是阿里巴巴开源的 离线数据同步工具,用于 异构数据源之间的高效数据同步。
- 支持多种数据源,包括:
- 关系型数据库:MySQL、Oracle、SQL Server、PostgreSQL
- 大数据系统:Hive、HDFS、HBase
- NoSQL:MongoDB、Redis
- 文件:CSV、TXT、Parquet 等
- 特点:
- 插件化设计:通过 Reader(读取)和 Writer(写入)插件,支持多种数据源组合。
- 易用性:配置 JSON 文件即可完成复杂数据同步任务。
- 高性能:支持多线程分片、增量同步。
- 开源免费:可根据业务需要进行二次开发。
二、DataX 部署
1. 环境要求
- 操作系统:Linux / Windows / MacOS
- JDK:Java 1.8+
- Python:仅部分脚本需要 Python 环境(DataX 自带 Python 执行环境)
2. 下载与解压
- 从 GitHub 官方仓库下载:
git clone https://github.com/alibaba/DataX.git
- 解压到目标目录,例如:
/opt/datax
3. 配置环境
- 配置
JAVA_HOME
:export JAVA_HOME=/usr/local/java export PATH=$JAVA_HOME/bin:$PATH
- DataX 默认使用 Python 执行器,一般无需额外安装 Python。
4. 插件安装
- DataX 内置多种 Reader/Writer 插件:
- mysqlreader / mysqlwriter
- hdfsreader / hdfswriter
- oraclewriter / oraclereader
- 直接在 DataX 解压目录
plugin/reader
和plugin/writer
查看。
三、DataX 工作原理
DataX 的工作原理可以分为以下步骤:
- 任务配置(JSON)
- 用户定义一个 JSON 文件,包括:
- Reader 插件及配置(源数据)
- Writer 插件及配置(目标数据)
- 同步字段、分片数量、并发线程等
- 用户定义一个 JSON 文件,包括:
- 启动任务
- 使用
python ${DATAX_HOME}/bin/datax.py job.json
启动任务
- 使用
- 任务拆分与并行执行
- DataX 根据配置将任务拆分为多个 Channel(数据流通道)
- Reader 将数据分片读出,Writer 并行写入
- 数据同步完成
- 汇总任务执行情况
- 输出日志和统计信息
DataX 架构图示意:
+------------------+
| Job |
+------------------+
|
+---------+---------+
| |
+-----v-----+ +-----v-----+
| Reader | | Writer |
+-----+-----+ +-----+-----+
| |
+-----v-------------------v-----+
| Channel |
+--------------------------------+
四、DataX 使用方法
1. 配置 JSON 示例(MySQL → MySQL)
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": ["id","name","age"],
"splitPk": "id",
"connection": [
{
"table": ["source_table"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"column": ["id","name","age"],
"connection": [
{
"table": ["target_table"],
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db"
}
]
}
}
}
]
}
}
2. 执行任务
python ${DATAX_HOME}/bin/datax.py job_mysql_to_mysql.json
- 执行过程中会输出 每个 Channel 的读写进度
- 任务完成后生成 日志文件,默认保存在
log
目录
3. 高级功能
- 增量同步:
- 利用 Reader 的
where
条件,只读取增量数据 - Writer 支持
replace
、insert
、update
等模式
- 利用 Reader 的
- 分片并发:
- JSON 中
speed.channel
设置通道数,提高吞吐量
- JSON 中
- 异常处理:
- DataX 会自动重试失败 Channel
- 日志中会显示失败原因,方便排查
- 支持 Hadoop/Hive/HBase:
- 可做 异构数据迁移,如 MySQL → HDFS → Hive
五、总结
- DataX 核心定位:跨数据源的数据同步工具
- 特点:开源、插件化、支持异构源、可并行、多线程
- 使用流程:
- 配置 JSON 文件(Reader/Writer/字段/并发)
- 启动任务 (
datax.py job.json
) - 查看日志,监控进度
- 适合场景:
- 数据仓库 ETL
- 异构数据库迁移
- 日志/指标数据汇总
明白阿杰,我帮你整理一个 DataX MySQL → Hive 的完整实战示例,包含 增量同步、分片设置、日志分析,让你可以直接上手。
一、前置条件
- 环境准备:
- MySQL 数据库(源数据库)
- Hive/Hadoop 环境(目标数据库)
- DataX 已部署,JAVA 环境配置正确
- 插件准备:
mysqlreader
hdfswriter
(写入 Hive 可用 HDFS,再建外部表)
二、示例场景
- 源表:
user_info
(MySQL)- 字段:
id
,name
,age
,update_time
- 字段:
- 目标表:
user_info_hive
(Hive)- 字段一致,外部表存储在
/user/hive/warehouse/user_info_hive/
- 字段一致,外部表存储在
- 增量规则:
update_time > '${last_sync_time}'
- 每次同步后更新
${last_sync_time}
三、DataX JSON 配置示例
{
"job": {
"setting": {
"speed": {
"channel": 4 // 并行 4 个线程
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": ["id","name","age","update_time"],
"splitPk": "id",
"connection": [
{
"table": ["user_info"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
"where": "update_time > '${last_sync_time}'"
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://namenode:8020",
"fileType": "text",
"path": "/user/hive/warehouse/user_info_hive/",
"fileName": "user_info",
"writeMode": "append",
"fieldDelimiter": "\t",
"column": [
{"name":"id","type":"string"},
{"name":"name","type":"string"},
{"name":"age","type":"string"},
{"name":"update_time","type":"string"}
]
}
}
}
]
}
}
四、执行步骤
- 创建 Hive 外部表(第一次执行):
CREATE EXTERNAL TABLE user_info_hive(
id STRING,
name STRING,
age STRING,
update_time STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse/user_info_hive/';
- 执行 DataX 同步任务:
python ${DATAX_HOME}/bin/datax.py mysql_to_hive_job.json
- 查看日志:
- DataX 会输出每个 Channel 的读写进度:
Channel-1: read 1000 records, write 1000 records Channel-2: read 950 records, write 950 records ... Total: 3950 records
- 日志文件在
${DATAX_HOME}/log
下,可查找失败记录或错误信息。
五、增量同步策略
- 变量
${last_sync_time}
:- 可通过脚本读取上次同步时间
- 动态替换到 JSON 的
where
条件中
- 示例 Bash 更新策略:
# 获取上次同步时间
LAST_TIME=$(cat /opt/datax/last_sync_time.txt)
# 替换 JSON 模板变量
sed "s/\${last_sync_time}/$LAST_TIME/g" mysql_to_hive_job_template.json > mysql_to_hive_job.json
# 执行 DataX
python ${DATAX_HOME}/bin/datax.py mysql_to_hive_job.json
# 同步完成后更新时间
date "+%Y-%m-%d %H:%M:%S" > /opt/datax/last_sync_time.txt
六、日志分析与监控
- 日志目录:
${DATAX_HOME}/log
- 关键字段:
Total Records
:总同步记录数Channel-X Records
:每个线程读写记录数Error Record
:错误记录数
- 异常处理:
errorLimit.record
:失败记录数超过该值任务失败errorLimit.percentage
:失败占比超过该值任务失败
七、总结
- DataX MySQL → Hive 实战特点:
- 并行分片同步,提高效率
- 增量同步,减少全量开销
- 可通过日志追踪和监控,保证数据质量
- 外部表 + TextFile 形式,便于 Hive 查询
发表回复