欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > datax使用实例及优化

datax使用实例及优化

2024/11/30 8:40:08 来源:https://blog.csdn.net/v15220/article/details/139783180  浏览:    关键词:datax使用实例及优化

一、实例


1.从MySQL读取数据到hdfs
    

查看模板
    python bin/datax.py -r mysqlreader -w hdfswriter
    覆写模板
  

 {"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name"], "connection": [{"jdbcUrl": ["jdbc:mysql://node1:3306/test"], "table": ["student"]}], "password": "1234", "username": "root", "where": "id<=1003"}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name":"id","type":"int"},{"name":"name","type":"string"}], "compress": "", "defaultFS": "hdfs://node1:8020", "fieldDelimiter": "\t", "fileName": "student.txt", "fileType": "text", "path": "/", "writeMode": "append"}}}], "setting": {"speed": {"channel": "1"}}}
}

2.从hdfs读取数据到MySQL
    

查看模板
    python bin/datax.py -r hdfsreader -w mysqlwriter
    覆写模板
    

{"job": {"content": [{"reader": {"name": "hdfsreader", "parameter": {"column": ["*"],/*    "column": [{"index":0 ##第一列"type":"int"},{"index":1 ##第二列"type":"string"}],                    "defaultFS": "hdfs://node1:8020", "encoding": "UTF-8", "fieldDelimiter": "\t", "fileType": "text", "path": "/student.txt__425cbff6_c4f7_45e7_8caf_ad5bc51479ee"}}, "writer": {"name": "mysqlwriter", "parameter": {"column": ["id","name"], "connection": [{"jdbcUrl": "jdbc:mysql://node1:3306/test", "table": ["student"]}], "password": "1234", "preSql": [], "session": [], "username": "root", "writeMode": "insert"}}}], "setting": {"speed": {"channel": "1"}}}
}


3.从oracle读取数据到mysql


本人在windows上装了Oracle,即服务器在Windows上,linux允许datax作业读取数据到mysql时需要连接到windows上的oracle服务
(linux连接windows上的服务),需要先在windows上的防火墙设置开放1521端口;操作步骤 设置-网络和Internet-高级网络设置-Windows防火墙-高级设置-
(右键)入站规则-新建规则-端口-端口号(多个可用逗号隔开)-允许连接-全选-名称-完成
oracle 服务器链接为 jdbc:oracle:thin:@192.168.173.1:1521:orcl
    查看模板
    python bin/datax.py -r oraclereader -w mysqlwriter
    覆写模板
 

   {"job": {"content": [{"reader": {"name": "oraclereader", "parameter": {"column": ["empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno"], "connection": [{"jdbcUrl": ["jdbc:oracle:thin:@192.168.173.1:1521:orcl"], "table": ["emp"]}], "password": "1", "username": "scott"}}, "writer": {"name": "mysqlwriter", "parameter": {"column": ["empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno"], "connection": [{"jdbcUrl": "jdbc:mysql://node1:3306/test", "table": ["emp"]}], "password": "1234", "username": "root", "writeMode": "insert"}}}], "setting": {"speed": {"channel": "1"}}}
}

4.从MySQL读取数据到oracle

查看模板
    python bin/datax.py -r mysqlreader -w oraclewriter
    覆写模板
    

{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name"], "connection": [{"jdbcUrl": ["jdbc:mysql://node1:3306/test"], "table": ["student"]}], "password": "1234", "username": "root", "where": "id>=1003"}}, "writer": {"name": "oraclewriter", "parameter": {"column": ["sno","sname"], "connection": [{"jdbcUrl": "jdbc:oracle:thin:@192.168.173.1:1521:orcl", "table": ["student"]}], "password": "1", "username": "scott"}}}], "setting": {"speed": {"channel": "1"}}}
}

5.从Oracle读取数据到hdfs

查看模板
    python bin/datax.py -r oraclereader -w hdfswriter
    覆写模板
    

{"job": {"content": [{"reader": {"name": "oraclereader", "parameter": {"column": ["empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno"], "connection": [{"jdbcUrl": ["jdbc:oracle:thin:@192.168.173.1:1521:orcl"], "table": ["emp"]}], "password": "1", "username": "scott"}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name":"empno", "type":"int"},{"name":"ename", "type":"string"},{"name":"job","type":"string"},{"name":"mgr", "type":"string"},{"name":"hiredate","type":"date"},{"name":"sal","type":"double"},{                            "name":"comm","type":"double"},{"name":"deptno""type":"int"}], "compress": "", "defaultFS": "hdfs://node1:8020", "fieldDelimiter": "\t", "fileName": "emp.txt", "fileType": "text", "path": "/", "writeMode": "append"}}}], "setting": {"speed": {"channel": "1"}}}
}

6.从hdfs读取数据到Oracle

查看模板
    python bin/datax.py -r hdfsreader -w oraclewriter
    覆写模板
  

  {"job": {"content": [{"reader": {"name": "hdfsreader", "parameter": {"column": [{"index":0, "type":"string"},{"index":1, "type":"string"},{"index":2,"type":"string"},{"index":3, "type":"string"},{"index":4,"type":"date"},{"index":5,"type":"string"},{                            "index":6,"type":"string"},{"index":7"type":"string"}], "defaultFS": "hdfs://node1:8020", "encoding": "UTF-8", "fieldDelimiter": "\t", "fileType": "text", "path": "/emp.txt__c7a80f98_674d_448d_8fbd_b9de44b4d30c","nullFormat":"\\N"}}, "writer": {"name": "oraclewriter", "parameter": {"column": ["empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno"], "connection": [{"jdbcUrl": ["jdbc:oracle:thin:@192.168.173.1:1521:orcl"], "table": ["empc1"]}], "password": "1", "username": "scott"}}}], "setting": {"speed": {"channel": "1"}}}
}

如果过程出现提示脏数据
2024-06-18 00:30:32.273 [0-0-0-reader] ERROR StdoutPluginCollector - 脏数据: 
{"byteSize":6,"index":5,"rawData":"3000.0","type":4}],"type":"reader","message":"类型转换错误, 无法将[] 转换为[DOUBLE]"}
或{"byteSize":0,"index":6,"rawData":"","type":5}],"type":"reader","message":"No enum constant com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.Type.INT"}
可以把类型改为"string"重试;

二.优化

1. 提升每个 channel 的速度

分两种,一种是控制每秒同步的记 录数,另外一种是每秒同步的字节数,默认的速度限制是 1MB/s

2. 提升 DataX Job 内 Channel 并发

注意:对单表如果没有安装主键切分,那么配置通道个数不会提升速度,效果与1个通道一样。所以在编写作业json文件时用到数据库作为reader时(reader)建议加上splidPk参数

并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。 提升 job 内 Channel 并发有三种配置方式:

2.1配置全局 Byte 限速以及单 Channel Byte 限速

Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速

core.transport.channel.speed.* 配置在conf/core.json文件下

job.setting.speed.* 是指我们写的作业json文件中的setting

{ "core": { "transport": { "channel": { "speed": { "byte": 1048576 } } } }, "job": { "setting": { "speed": { "byte" : 5242880 } }, ... } }

core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,所以 Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速=5242880/1048576=5 个

2.2配置全局 Record 限速以及单 Channel Record 限速

Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速

{ "core": { "transport": { "channel": { "speed": { "record": 100 } } } }, "job": { "setting": { "speed": record" : 500 } }, ... } }

core.transport.channel.speed.record=100 , job.setting.speed.record=500, 所 以 配 置 全 局 Record 限速以及单 Channel Record 限速,Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速=500/100=5

2.3 直接配置 Channel 个数(优先级较前两种低)

{ "job": { "setting": { "speed": { "channel" : 5 } }, ... } }

直接配置 job.setting.speed.channel=5,所以 job 内 Channel 并发=5 个

3.调整数据库写入时(writer)批量提交行数

  • 添加参数batchSize(默认1024)
    • 描述:一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。随着通道数的增加(通道数<32),速度呈线性比增加

4. 提高 JVM 堆内存

       为了防止 OOM 等错 误,调大 JVM 的堆内存。 建议将内存设置为 4G 或者 8G。 调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动 的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" hdfs-mysql.json

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com