DataX 概述、部署、数据同步运用示例

news/2024/5/20 1:30:31 标签: hadoop, 数据仓库, hdfs, mysql, DataX, 数据同步

文章目录

DataX_2">什么是 DataX

DataX 是阿里巴巴集团开源的、通用的数据抽取工具,广泛使用的离线数据同步工具/平台。它设计用于支持多种数据源之间的高效数据传输,可以实现不同数据源之间的数据同步、迁移、ETL(抽取、转换、加载)等数据操作。

主要特点和功能包括:

  1. 多数据源支持DataX支持多种数据源,包括关系型数据库(如 MySQL、Oracle、SQL Server)、NoSQL 数据库(如 MongoDB、HBase、Redis)、HDFS、FTP、Hive 等。

  2. 扩展性DataX具有良好的扩展性,可以通过编写插件来支持新的数据源或者数据目的地,以满足不同数据存储系统的需求。

  3. 灵活配置:通过配置文件,用户可以灵活地定义数据抽取任务,包括数据源、目的地、字段映射、转换规则等,以适应不同的数据处理需求。

  4. 高效传输DataX 采用分布式、并行的数据传输策略,可以将数据以高效、快速的方式传输到目标数据源,提高数据处理的效率。

  5. 任务调度和监控DataX 提供了任务调度和监控的功能,可以通过控制台查看任务运行情况,监控任务的健康状态,以及处理异常情况。

  6. 开源社区支持DataX 是开源项目,拥有活跃的开源社区支持,可以获取到丰富的文档、示例和开发者社区的帮助。

在这里插入图片描述

DataX__25">DataX 设计框架

DataX 本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。同时 DataX 插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

DataX 本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

  • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。

  • Framework:Framework 用于连接 readerwriter,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX__41">DataX 核心架构

  1. DataX 完成单个数据同步的作业,我们称之为 Job,DataX 接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便于并发执行。Task 便是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。

  3. 切分多个 Task 之后,DataX Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。

  4. 每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为 5

  5. DataX 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非 0

DataX__58">DataX 部署

DataX 3.0 下载地址

1. 解压压缩包

tar -zxvf datax.tar.gz -C /opt/module/

2. 配置环境变量

sudo vim /etc/profile.d/my.sh

# 添加如下内容:
#DATAX_HOME
export DATAX_HOME=/opt/module/datax

刷新环境变量:source /etc/profile.d/my.sh

3. 运行测试程序

运行 DataX 自带的测试程序。

cd $DATAX_HOME

python bin/datax.py job/job.json

其中 datax.py 是执行的主程序,job.json 中存储的是数据源插件配置(其中有数据源的连接信息、存储信息、配置信息等)。

正常运行完成后,会看到输出相关日志内容:

在这里插入图片描述

DataX 无需进行其它配置,解压后即可快速使用。

DataX__MySQL__HDFS_98">DataX 数据同步 MySQL —> HDFS

1.创建 MySQL 测试库表

CREATE DATABASE `test_datax` CHARACTER SET 'utf8mb4';
USE `test_datax`;

-- 商品属性表
DROP TABLE IF EXISTS sku_info;
CREATE TABLE sku_info(
    `sku_id`      varchar(100) COMMENT "商品id",
    `name`        varchar(200) COMMENT "商品名称",
    `category_id` varchar(100) COMMENT "所属分类id",
    `from_date`   varchar(100) COMMENT "上架日期",
    `price`       decimal(10,2) COMMENT "商品单价"
) COMMENT "商品属性表";

insert into sku_info
values ('1', 'xiaomi 10', '1', '2020-01-01', 2000),
       ('2', '手机壳', '1', '2020-02-01', 10),
       ('3', 'apple 12', '1', '2020-03-01', 5000),
       ('4', 'xiaomi 13', '1', '2020-04-01', 6000),
       ('5', '破壁机', '2', '2020-01-01', 500),
       ('6', '洗碗机', '2', '2020-02-01', 2000),
       ('7', '热水壶', '2', '2020-03-01', 100),
       ('8', '微波炉', '2', '2020-04-01', 600),
       ('9', '自行车', '3', '2020-01-01', 1000),
       ('10', '帐篷', '3', '2020-02-01', 100),
       ('11', '烧烤架', '3', '2020-02-01', 50),
       ('12', '遮阳伞', '3', '2020-03-01', 20);

2.配置 DataX 插件

我们进入 DataX 官网,下滑找到对应组件插件的文档。

我们这里是从 MySQL 中读数据,然后存储到 HDFS 上,所以我们这里就先去查找 MySQL Reader 插件文档。

然后就会找到一个 MySQL Reader 的插件配置样例,如下所示:

{
    "job": {
        "setting": {
         	// 指定通道数量(并发)
            "speed": {
                 "channel": 3
            },
            // 允许的误差范围
            "errorLimit": { 
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
            	// 读取配置
                "reader": {
                	// 固定写法——mysqlreader
                    "name": "mysqlreader",
                    "parameter": {
                        // 账号密码
                        "username": "root",
                        "password": "root",
                        // 选取需要进行同步的字段
                        "column": [
                            "id",
                            "name"
                        ],
                        // 通过数据切片进行数据同步
                        "splitPk": "db_id",
                        
                        // 指定库表连接信息
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
                // 写入配置
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

了解 MySQL Reader 插件配置后,我们现在来学习 HDFS Writer 插件配置如何编写。

HDFS Writer 插件配置样例如下所示:

{
    "setting": {},
    "job": {
         // 指定通道数量(并发)
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                // 读取配置
                "reader": {
                    // 固定写法——txtfilereader
                    "name": "txtfilereader",
                    // 指定读取路径、编码、字段
                    "parameter": {
                        "path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                            },
                            {
                                "index": 1,
                                "type": "long"
                            },
                            {
                                "index": 2,
                                "type": "long"
                            },
                            {
                                "index": 3,
                                "type": "long"
                            },
                            {
                                "index": 4,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 5,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 6,
                                "type": "STRING"
                            },
                            {
                                "index": 7,
                                "type": "STRING"
                            },
                            {
                                "index": 8,
                                "type": "STRING"
                            },
                            {
                                "index": 9,
                                "type": "BOOLEAN"
                            },
                            {
                                "index": 10,
                                "type": "date"
                            },
                            {
                                "index": 11,
                                "type": "date"
                            }
                        ],
                        "fieldDelimiter": "\t"
                    }
                },
                // 写入配置
                "writer": {
                	// 固定写法——hdfswriter
                    "name": "hdfswriter",
                    // 指定存储路径、格式、文件前缀名、字段
                    "parameter": {
                        "defaultFS": "hdfs://xxx:port",
                        "fileType": "orc",
                        "path": "/user/hive/warehouse/writerorc.db/orcfull",
                        "fileName": "xxxx",
                        "column": [
                            {
                                "name": "col1",
                                "type": "TINYINT"
                            },
                            {
                                "name": "col2",
                                "type": "SMALLINT"
                            },
                            {
                                "name": "col3",
                                "type": "INT"
                            },
                            {
                                "name": "col4",
                                "type": "BIGINT"
                            },
                            {
                                "name": "col5",
                                "type": "FLOAT"
                            },
                            {
                                "name": "col6",
                                "type": "DOUBLE"
                            },
                            {
                                "name": "col7",
                                "type": "STRING"
                            },
                            {
                                "name": "col8",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col9",
                                "type": "CHAR"
                            },
                            {
                                "name": "col10",
                                "type": "BOOLEAN"
                            },
                            {
                                "name": "col11",
                                "type": "date"
                            },
                            {
                                "name": "col12",
                                "type": "TIMESTAMP"
                            }
                        ],
                        // 指定存储模式
                        "writeMode": "append",
                        // 指定存储间隔符
                        "fieldDelimiter": "\t",
                        // 指定数据压缩模式
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

3. 编写 DataX 同步 MySQL 数据到 HDFS 插件配置

cd $DATAX_HOME/job

vim test.json

添加下列内容:

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "000000",
                        "column": [
                            "sku_id",
                            "name",
                            "category_id",
                            "from_date",
                            "price"
                        ],
                        "splitPk": "",
                        "connection": [
                            {
                                "table": [
                                    "sku_info"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop120:8020",
                        "fileType": "text",
                        "path": "/testInputPath",
                        "fileName": "sku_info",
                        "column": [
                            {
                                "name": "sku_id",
                                "type": "STRING"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "category_id",
                                "type": "STRING"
                            },
                            {
                                "name": "from_date",
                                "type": "STRING"
                            },
                            {
                                "name": "price",
                                "type": "DOUBLE"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"gzip"
                    }
                }
            }
        ]
    }
}

上面的插件配置信息表明,将 MySQL 中 test_datax.sku_info 表的全量数据同步到 HDFS 中的 /testInputPath 路径下,如果指定输出文件类型为 text,那么必须指定压缩模式为 gzipbzip2

启动 Hadoop 集群,提前创建 HDFS 中的存储路径

hadoop fs -mkdir /testInputPath

运行 DataX,进行数据同步

cd $DATAX_HOME

python bin/datax.py job/test.json

执行完成后如下所示:

在这里插入图片描述

在 HDFS 上查看存储路径下的文件:

在这里插入图片描述

如果想要直接查看 HDFS 文件中存储的内容,可以运行下列命令:

hadoop fs -cat /testInputPath/* | zcat

在这里插入图片描述

DataX__HDFS__MySQL_475">DataX 数据同步 HDFS —> MySQL

将 HDFS 中 /testInputPath 路径下的数据全量导入到 MySQL 中 test_datax 库中的 test_sku_info 表(该表需要提前创建)。

1. 编写 HDFS 读取插件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/testInputPath/*",
                        "defaultFS": "hdfs://hadoop120:8020",
                        "column": [
                        	"*"
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }

                },
                "writer": {
                	// 待编写
                }
            }
        ]
    }
}

这里需要注意 HDFS 的数据存放格式、间隔符、压缩方式等。

2. 编写 MySQL 写入插件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                 "reader": {
                   // 待编写
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "000000",
                        "column": [
                        	"*"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop120:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicRetrieval=true",
                                "table": [
                                    "test_sku_info"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

导入数据到 MySQL 时,需要注意写入模式 writeMode,因为 HDFS 中的存储数据内容并没有主键的概念,而 MySQL 中有主键,如果模式选择不正确,会带来不必要的麻烦,模式说明如下:

在这里插入图片描述

分别表示插入写 insert、覆盖写 replace、更新写 update

3. 合并读写插件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/testInputPath/*",
                        "defaultFS": "hdfs://hadoop120:8020",
                        "column": [
                            "*"
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }

                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "000000",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop120:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicRetrieval=true",
                                "table": [
                                    "test_sku_info"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

合并完成后,将其存储到 $DATAX_HOME/job/hdfs_to_mysql.json 中。

4. 创建 MySQL 中的存储库

USE `test_datax`;

DROP TABLE IF EXISTS `test_sku_info`;
CREATE TABLE `test_sku_info`  (
  `sku_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '商品id',
  `name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '商品名称',
  `category_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '所属分类id',
  `from_date` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '上架日期',
  `price` decimal(10, 2) NULL DEFAULT NULL COMMENT '商品单价'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '商品属性表' ROW_FORMAT = Dynamic;

5. 执行数据同步

将 HDFS 中 /testInputPath 路径下的数据全量导入到 MySQL 中 test_datax 库中的 test_sku_info 表。

cd $DATAX_HOME

python bin/datax.py job/hdfs_to_mysql.json

在这里插入图片描述

运行完成后,进入 MySQL 中查看同步过来的内容:

select * from test_datax.test_sku_info limit 10;

在这里插入图片描述

数据同步完成,其它数据源数据同步方式也基本上差不多,多看看官方插件配置文档就会了。

更多内容请查看:DataX 官网

DataX__664">DataX 优化

DataX3.0 提供了包括通道(并发)、记录流、字节流三种流控模式,可以根据随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

速度优化

"speed": {
   "channel": 5,
   "byte": 1048576,
   "record": 10000
}
  • "channel": 5 这个参数指定了通道的数量。通道可以理解为并行处理数据的通道数。在数据传输过程中,可以同时处理多个通道,提高数据传输效率。

  • "byte": 1048576 这个参数指定了每个通道传输的字节数,这里的数值是 1048576,即 1 MB

  • "record": 10000 这个参数指定了每个通道传输的记录数,这里的数值是 10000

配置时必须遵守以下规则:

  • 若配置了总 record 记录限速,则必须配置单个 channelrecord 记录限速。

  • 若配置了总 byte 字节限速,则必须配置单个 channelbyte 字节限速。

  • 若配置了总 record 记录限速和总 byte 字节限速,channel 并发数参数就会失效。

配置格式:

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576 //单个channel的byte字节限速1M/s
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 5242880 //总byte字节限速5M/s
            }
        },
        ......
    }
}

内存优化

在运行数据同步的时候直接指定堆内存大小。

cd $DATAX_HOME

python bin/datax.py --jvm="-Xms4G -Xmx4G" job/job.json

在这里,将 JVM 最大和最小堆内存都设置为 4GB

同步 MySQL 中 NULL 值数据到 HDFS 出现错误

解决datax抽mysql数据到hdfs之null值变成‘‘(引号)的问题

配置文件变量传参

有时候我们不想将 JSON 插件配置文件直接固定写死,那么我们就可以通过传参的方式来进行动态设置。

例如,我想要将 MySQL 数据同步到 HDFS 上,每天的同步日期都要进行修改,这时候就可以通过变量传参的方式来解决。

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1,
            },
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "000000",
                        "connection": [
                            {
                                "querySql": [
                                    "select sku_id,name,category_id,from_date,price from sku_info;"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop120:8020",
                        "fileType": "text",
                        "path": "/testInputPath/${dt}",
                        "fileName": "sku_info",
                        "column": [
                          "*"
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"gzip"
                    }
                }
            }
        ]
    }
}

这里使用了 MySQL 数据读取另一种写法(querySql

在这里插入图片描述

以 SQL 作为查询语句,同时也支持使用 Where 语句进行条件过滤。

在写入配置 "writer" 中设置了需要传参的日期变量 dt

	"path": "/testInputPath/${dt}",

模拟将其存储路径按照日期进行动态设置。

注意: 执行数据同步前需要在 HDFS 上先创建对应存储路径。

hadoop fs -mkdir /testInputPath/2023-09-16

现在来进行数据同步测试

cd $DATAX_HOME

python bin/datax.py job/var.json -p "-Ddt=2023-09-16"

正常执行完成后,结果如下所示:

在这里插入图片描述

数据同步完成

在这里插入图片描述


http://www.niftyadmin.cn/n/5033920.html

相关文章

Linux 信号集 及其 部分函数

这几个函数都是对自己自定义的信号集操作 int sigemptyset(sigset_t *set) 功能:清空信号集中的数据,将所有的标志位置为0 参数:set需要操作的信号集 返回值:成功0失败-1 int sigfillset(sigset_t *set) 功能:清空…

数字虚拟人制作简明指南

如何在线创建虚拟人? 虚拟人,也称为数字化身、虚拟助理或虚拟代理,是一种可以通过各种在线平台与用户进行逼真交互的人工智能人。 在线创建虚拟人变得越来越流行,因为它为个人和企业带来了许多好处。 推荐:用 NSDT编辑…

Vue3数组重新赋值问题

Vue3数组重新赋值问题 1. reactive2. ref总结 vue3中使用组合式式API时定义响应式数据需要使用reactive或者ref,两者使用时有些许不同,下面通过重新赋值数组来说明两者的不同 1. reactive 主要用来定义复杂一些的响应式数据 先清空再赋值 const datas reactive([{id:1,name…

听GPT 讲Istio源代码--pilot(3)

File: istio/pilot/pkg/security/authz/model/generator.go 在Istio项目中,generator.go文件实现了Istio授权模型的生成器。该文件定义了一系列结构体和函数,用于生成授权策略、主体和权限。 下面是对每个结构体的详细介绍: generator结构体是…

python爬虫爬取电影数据并做可视化

思路: 1、发送请求,解析html里面的数据 2、保存到csv文件 3、数据处理 4、数据可视化 需要用到的库: import requests,csv #请求库和保存库 import pandas as pd #读取csv文件以及操作数据 from lxml import etree #解析html库 from …

《DevOps实践指南》- 读书笔记(九)

DevOps实践指南 25. 附录附录 1 DevOps 的大融合精益运动敏捷运动Velocity 大会运动敏捷基础设施运动持续交付运动丰田套路运动精益创业运动精益用户体验运动Rugged Computing 运动 附录 2 约束理论和核心的长期冲突附录 3 恶性循环列表附录 4 交接和队列的危害附录 5 工业安全…

【前端】js下载url文件

不打开新窗口进行下载 function download(res) { var elemIF document.createElement("iframe"); elemIF.src res; elemIF.style.display "none"; document.body.appendChild(elemIF); } window.open(url, _blank); a标签 const ele …

【题解】有效括号

代码: class Solution {public static void main(String[] args) {Solution solution new Solution();System.out.println(solution.isValid("]"));}public boolean isValid(String s) {Stack<Character> stack new Stack<>();for (char c : s.toChar…