欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > 记录使用python解析sql文件异步批量插入数据

记录使用python解析sql文件异步批量插入数据

2024/10/23 23:33:40 来源:https://blog.csdn.net/qq_42031243/article/details/140817531  浏览:    关键词:记录使用python解析sql文件异步批量插入数据
起因

最近对接了某机构数据,他们数据使用的是oracle,而我方数据库则是mysql,他们那边给我们使用的测试数据是以oracle的形式,不能直接执行sql插入到mysql中,或者是多个表数据提取一些需要的数据字段合并到一个表中,所以需要对该sql进行一些处理

当然也可以在我们内部服务器上搭建oracle数据库,然后将这个sql在oracle数据中执行,然后再使用数据同步工具,比如 Navicat中的 工具-数据传输功能,将oracle里面的表结构以及数据同步复制传输到mysql中,或者使用 Kettle 进行数据同步

使用Navicat 问题

想要使用Navicat就sql同步到mysql数据库中有两个步骤,

1 是先将sql文件执行导入到oracle里面,
2 将oracle数据表通过工具-数据传输功能,将数据传输到mysql中

但是在Navicat中,是按sql文件行读取导入的,这就出现一种情况,sql文件很大,有百万条数据,每条sql导入都要执行一次,这就很慢了,而且使用Navicat工具将数据同步到mysql中的时候,也很慢, 而且使用Navicat 只能将表同步,而不能将多个表的字段提取出来合并到一个新表中

如果使用Kettle ,那就需要编写各种流处理操作,也是需要先将sql导入到数据库中才能进行处理,这样会很慢

使用python实现

最后使用了python实现读取sql文件进行批量导入,将导入过程分成3步

1 异步读取文件,解析sql语句,替换sql语句中的一些特殊符号以及内容,将sql数据通过异步队列发送给异步任务

2 异步执行批量导入任务读取异步队列中的结果数据进行批量导入操作,如果出现异常,则将异常数据发送给异常处理

3 异常处理可以将异常数据写入到另一个记录文件中进行保存,也可以尝试就将批量数据结构挨条进行导入执行

import asyncio
import timeimport aiofiles
import aiomysql# 连接数据库
async def connect_db():conn = await aiomysql.connect(host='10.10.6.131', port=3306,user='root', password='root',db='test')return conn# 解析sql文件
async def parse_file(queue: asyncio.Queue):async with aiofiles.open('test.sql', 'r', encoding="utf-8") as f:while True:data = await f.readlines(10000)if not data:breakdata_list = []for i in data:if 'INSERT INTO "XXX"."PERFORMANCE_HISTORY" VALUES (' in i:d = i.replace('INSERT INTO "FDC"."PERFORMANCE_HISTORY" VALUES (', '')d = d.replace(");", "")d = d.replace("\n", "")d = d.replace("\t", "")d = d.replace(", 'SYYYY-MM-DD HH24:MI:SS')", "")d = d.replace("TO_DATE(", "")d = d.split(", ")d = [None if "NULL" in i else str(i.replace("'", "")) for i in d]data_list.append(d)if len(data_list) > 0:await queue.put(data_list)print(queue.qsize(), time.time())await queue.put(None)async def batch_insert_data(conn: aiomysql.Connection, queue: asyncio.Queue, qu2: asyncio.Queue):sql = "INSERT INTO performance(id, name, date) values (%s, %s, %s)"async with conn.cursor() as cursor:while 1:data = await queue.get()if data is None:breaktry:await cursor.executemany(sql, data)await conn.commit()except Exception as e:print("批量插入失败", data)await qu2.put(data)await qu2.put(None)async def insert_error_data(conn: aiomysql.Connection, queue: asyncio.Queue):sql = "INSERT INTO t_dm_f_performance(id, name, date) values (%s, %s, %s)"async with conn.cursor() as cursor:while 1:data = await queue.get()if data is None:breakfor i in data:try:await cursor.execute(sql, i)await conn.commit()except Exception as e:passasync def main():queue = asyncio.Queue(maxsize=1000)error_queue = asyncio.Queue(maxsize=1000)conn1 = await connect_db()conn2 = await connect_db()await asyncio.gather(parse_file(queue), batch_insert_data(conn1, queue, error_queue), insert_error_data(conn2, error_queue))await conn1.close()await conn2.close()if __name__ == '__main__':asyncio.run(main())

最终实现了批量插入功能,比起先将sql导入到oracle,再同步到mysql快了不少

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com