数据同步工具DataX从Mysql同步数据到HDFS实战

news/2024/5/20 3:39:54 标签: hdfs, mysql, hadoop, 数据同步工具, DataX

目录

  • 1. 查看数据同步模板
  • 2. 高可用HA的HDFS配置
  • 3. MysqlReader针对Mysql类型转换说明
  • 4. HdfsWriter支持大部分Hive类型
  • 5. Mysql准备数据如下
  • 6. 新建job/mysql2hdfs.json
  • 7. 执行job
  • 8. 查看hdfs的文件

1. 查看数据同步模板

我自己在下面的模板文件中添加了一些说明注释

[root@bigdata001 datax]# bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [],                                             # 可以填写["*"]表示同步所有列。还支持["1", "2.3", "true", "'bazhen.csy'", "null", "upper('a')"], 分别表示整形、浮点数、布尔值、字符串、空指针,表达式
                        "connection": [
                            {
                                "jdbcUrl": [],                                     # 支持多个连接地址。会依次进行连接测试,选择一个可用的进行查询数据
                                "table": []                                         # 支持同步多个表。多个表必须schema相同
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": "",
                        "splitPk": "",                                            # 一般是主键,只支持整形字段。先计算min(splitPk)、max(splitPk),再进行范围分区划分,将job划分成多个task。不指定则只有一个task
                        "querySql": ["select id, name from person where id < 10;"]    # 这个是我自己添加的。有了querySql会自动忽略column、table、where
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [],                                            # 必须和reader的列数量对应
                        "compress": "",                                       # 默认不填写,表示不压缩。text文件支持gzip、bzip2; orc文件支持NONE、SNAPPY
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "",                                          # 目前仅支持text和orc。其中orc需指定compress为SNAPPY
                        "path": "",                                                # 该路径必须存在
                        "writeMode": ""                                       # append:表示新建一个文件插入数据;nonConflict:有fileName为前缀的文件直接报错
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
[root@bigdata001 datax]#

2. 高可用HA的HDFS配置

配置参考如下:

                        "defaultFS": "hdfs://192.168.8.111:9000", 
						"hadoopConfig": {
						    "dfs.nameservices": "nnha",
						    "dfs.ha.namenodes.nnha": "nn1,nn2,nn3",
						    "dfs.namenode.rpc-address.nnha.nn1": "192.168.8.111:9870",
						    "dfs.namenode.rpc-address.nnha.nn2": "192.168.8.112:9870",
						    "dfs.namenode.rpc-address.nnha.nn3": "192.168.8.113:9870",
						    "dfs.client.failover.proxy.provider.nnha": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
						},
                        "fieldDelimiter": "|"

3. MysqlReader针对Mysql类型转换说明

DataX内部类型Mysql数据类型
Longint, tinyint, smallint, mediumint, int, bigint
Doublefloat, double, decimal
Stringvarchar, char, tinytext, text, mediumtext, longtext, year
Datedate, datetime, timestamp, time
Booleanbit, bool
Bytestinyblob, mediumblob, blob, longblob, varbinary

4. HdfsWriter支持大部分Hive类型

DataX内部类型HIVE数据类型
LongTINYINT,SMALLINT,INT,BIGINT
DoubleFLOAT,DOUBLE
StringSTRING,VARCHAR,CHAR
BooleanBOOLEAN
DateDATE,TIMESTAMP

5. Mysql准备数据如下

mysql> create table person(
    -> id bigint,
    -> name varchar(64)
    -> );
Query OK, 0 rows affected (0.06 sec)

mysql> 
mysql> insert into person(id, name) values(1, 'yi'), (2, 'er');
Query OK, 2 rows affected (0.02 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> 
mysql> select * from person;
+------+------+
| id   | name |
+------+------+
|    1 | yi   |
|    2 | er   |
+------+------+
2 rows in set (0.00 sec)

mysql> 

mysql2hdfsjson_133">6. 新建job/mysql2hdfs.json

内容如下:

[root@bigdata001 datax]# cat job/mysql2hdfs.json 
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id", "name"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.8.115:3306/test"], 
                                "table": ["person"]
                            }
                        ], 
                        "password": "Root_123", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}], 
                        "compress": "", 
                        "defaultFS": "hdfs://192.168.8.111:9000", 
                        "fieldDelimiter": "|", 
                        "fileName": "person.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
[root@bigdata001 datax]#

7. 执行job

会先写入临时文件,如果成功,则将临时文件rename,再删除临时文件;如果失败,直接删除临时文件

[root@bigdata001 datax]# bin/datax.py job/mysql2hdfs.json 

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
......省略部分......
2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - begin do write...
2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - write to file : [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d]
......省略部分......
2022-06-14 10:16:43.512 [job-0] INFO  HdfsWriter$Job - start rename file [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d] to file [hdfs://192.168.8.111:9000/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d].
......省略部分......
2022-06-14 10:16:43.915 [job-0] INFO  HdfsWriter$Job - start delete tmp dir [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f] .
......省略部分......
2022-06-14 10:16:44.034 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2022-06-14 10:16:31
任务结束时刻                    : 2022-06-14 10:16:44
任务总计耗时                    :                 12s
任务平均流量                    :                0B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

[root@bigdata001 datax]#

hdfs_209">8. 查看hdfs的文件

会在该文件名后添加随机的后缀,作为每个线程写入的实际文件名

[root@bigdata001 ~]# hadoop fs -cat /person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d
1|yi
2|er
[root@bigdata001 ~]#

http://www.niftyadmin.cn/n/11190.html

相关文章

MCE | 癌症诊断和靶向治疗的“遍地开花”

据研究报道&#xff0c;很多癌细胞分泌的外泌体 (Exosome) 比正常细胞分泌的多 10 倍以上。外泌体参与了癌症的发生、进展、转移和耐药性&#xff0c;并通过转运蛋白和核酸&#xff0c;建立与肿瘤微环境的联系。例如&#xff0c;外泌体可导致免疫逃逸&#xff0c;癌细胞的免疫逃…

spring boot 过滤器拦截器与aop

目录 一、过滤器 (Filter) 1.1 什么是过滤器 1.2 springboot配置过滤器 方式一&#xff1a;使用WebFilter 二、拦截器(Interceptor) 2.1 什么是拦截器 2.2 使用拦截器方法 三、拦截器&过滤器与spring aop的区别 3.1 区别 3.2添加aop 适用场景&#xff1a; 拦截…

轻量级模型设计与部署总结

前言一些关键字定义及理解 计算量 FLOPs内存访问代价 MACGPU 内存带宽Latency and Throughput英伟达 GPU 架构 CNN 架构的理解手动设计高效 CNN 架构建议 一些结论&#xff1a; 一些建议轻量级网络模型部署总结轻量级网络论文解析文章 参考资料 文章同步发于 github 仓库 和 知…

深入浅出PyTorc——进阶训练技巧

1. 自定义损失函数 1.1 以函数方式定义 手动写出损失的公式并用函数进行存储&#xff0c;方便调用。 def my_loss(output, target):loss torch.mean((output - target)**2)return loss 1.2 以类方式定义 1.2.1 损失函数的继承关系 &#xff08;1&#xff09;Loss函数部分继…

【web前端期末大作业】HTML+CSS宠物狗静态网页设计

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

LQ0241 身份证号校验【程序填空】

题目来源&#xff1a;蓝桥杯2012初赛 C 题目描述 本题为代码补全填空题&#xff0c;请将题目中给出的源代码补全&#xff0c;并复制到右侧代码框中&#xff0c;选择对应的编译语言&#xff08;C/Java&#xff09;后进行提交。若题目中给出的源代码语言不唯一&#xff0c;则只需…

第40讲:MySQL索引的语法以及基本使用

文章目录1.索引的使用语法2.索引的基本使用2.1.准备一张数据表2.2.按照如下需求为表中的字段创建索引2.3.查看创建的索引2.4.删除索引3.验证使用索引前后的执行效率1.索引的使用语法 1&#xff09;创建索引 创建索引时&#xff0c;如果不指定索引的类型&#xff0c;默认就是常…

基于STM32G431嵌入式学习笔记——五、NVIC中断(以串口UART中断为例)

一、基础知识 1.专业术语 2.NVIC简介 ①在这里要注意&#xff0c;中断控制是分级处理的 ②是否请求中断是中断源控制的。 ③是否响应中断是响应方控制的。 ④以外部中断为例&#xff0c;外部中断请求顺序就是首先从请求的外部设备中选出优先级最高的一个设备待中断&#xff0c…