欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 会展 > postgresql|数据库|利用sqlparse和psycopg2库批量按顺序执行SQL语句(psyconpg2新优化版本)

postgresql|数据库|利用sqlparse和psycopg2库批量按顺序执行SQL语句(psyconpg2新优化版本)

2025/1/13 3:12:45 来源:https://blog.csdn.net/alwaysbefine/article/details/145044986  浏览:    关键词:postgresql|数据库|利用sqlparse和psycopg2库批量按顺序执行SQL语句(psyconpg2新优化版本)

一、

旧版批量执行SQL脚本的python文件缺点,优点,以及更新内容

书接上回,postgresql|数据库开发|python的psycopg2库按指定顺序批量执行SQL文件(可离线化部署)_python sql psycopg2-CSDN博客

这个python脚本写了很久了,最近开始实际使用,发现了很多问题,问题主要集中在以下几点:

1、

SQL语句解析不太标准,遇到;分号就也当SQL语句执行,导致很多不必要的错误,并且很多时候不能有效区分多行SQL,因此,本文计划使用sqlparse库来做更准确的SQL语句解析

2、

批量的SQL文件跑完后,并没有一个比较详细的总结报告,遇到问题不太好排查,因此,本文对此做了优化,当一个目录内的SQL文件都执行完毕后,给一个相对详细的报告

确保即使某些 SQL 文件执行失败,程序也会继续尝试执行其余的文件,并最终给出一个详细的执行报告,帮助用户了解哪些文件和语句被执行以及哪些文件遇到了问题。这样,用户不仅能知道哪些文件未能成功处理,还能清楚地看到哪些文件及其内部的语句已经成功应用到了数据库中。

3、

SQL文件排序使用正则表达式处理,以改善SQL文件排序有时候不正确的问题

4、

所有的指定目录下的SQL文件都执行,但,执行失败的SQL文件仍留在原文件夹,需要将失败的SQL文件拿出来,手动处理有问题的SQL文件

5、

SQL语句执行输出太多,成功的SQL语句不应该输出到控制台,导致脚本执行情况并不是一目了然,因此,对脚本输出进行优化,以方便SQL脚本的调试

优点:

1、

以SQL文件为单位,每一个SQL文件内的SQL语句要么全部执行成功,如果中间有任何错误就回滚,对于数据库的数据安全是有一定的保障的

2、

该工具执行迅速,效率非常高,但对于锁表的情况,现在暂时没有什么想法

二、

优化后的python脚本源码

import os
import re
import sys
import json
import shutil
import psycopg2
from psycopg2 import sql, OperationalError, ProgrammingError
import sqlparsedef print_colored_text(text, color_code):"""打印带有颜色的文本"""print(f"\033[{color_code}m{text}\033[0m")# 定义颜色代码
COLORS = {'BLACK': 30,'RED': 31,'GREEN': 32,'YELLOW': 33,'BLUE': 34,'MAGENTA': 35,'CYAN': 36,'WHITE': 37
}def find_sql_files(path):"""查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""sql_files = [os.path.join(root, file)for root, _, files in os.walk(path)for file in filesif file.endswith('.sql')]def extract_first_number(filename):"""从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""match = re.search(r'\d+', os.path.basename(filename))return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后# 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字sorted_sql_files = sorted(sql_files, key=extract_first_number)return sorted_sql_files
def execute_sql_file(db_config, sql_file_path, script_dir):"""执行指定的 SQL 文件,并在遇到错误时移动文件"""conn = Nonecursor = Nonesuccessful_statements = []  # 新增: 记录成功的SQL语句try:conn = psycopg2.connect(**db_config)cursor = conn.cursor()with open(sql_file_path, 'r', encoding='utf-8') as file:parsed_statements = sqlparse.split(file.read())for raw_stmt in parsed_statements:stmt = raw_stmt.strip()if not stmt:  # 忽略空语句continuestatements = sqlparse.parse(stmt)for statement in statements:if statement.tokens:  # 确保有非空的SQL语句stmt_str = str(statement).strip(';').strip()if stmt_str:  # 忽略空语句# print(stmt_str)# print_colored_text('<<<<<<<=======<<<<<++++++++<<<<<<上面这条语句将要执行了===----========*****', COLORS['CYAN'])try:cursor.execute(stmt_str)successful_statements.append(stmt_str)  # 添加到成功语句列表#print_colored_text('执行成功!', COLORS['GREEN'])except (Exception, psycopg2.DatabaseError) as e:print(f"执行失败的语句是:")print(f"{stmt_str}")print("=========这是第一个分隔符===================")print("报错详细信息是:")print(e)print("===========这是第二个分隔符===================")print(f"执行的文件名称是:")print(f"{sql_file_path}")conn.rollback()  # 回滚事务move_failed_file(sql_file_path, script_dir)return False, successful_statementsconn.commit()print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])return True, successful_statementsexcept (OperationalError, ProgrammingError) as error:print(f"Database error occurred while executing SQL file {sql_file_path}: {error}")if conn:conn.rollback()return False, successful_statementsfinally:if cursor:cursor.close()if conn:conn.close()def move_failed_file(src, dst_dir):"""将失败的SQL文件移动到指定的目标目录"""try:os.makedirs(dst_dir, exist_ok=True)dst = os.path.join(dst_dir, os.path.basename(src))print(f"Moving failed SQL file to {dst}")shutil.move(src, dst)except Exception as e:print(f"Failed to move the file {src}: {e}")def main():if len(sys.argv) != 3:print("Usage: python script.py <user> <path_to_search>")sys.exit(1)user = sys.argv[1]search_path = sys.argv[2]script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录try:with open('test.json', 'r', encoding='utf-8') as f:params = json.load(f)db_config = params.get('db_config', {})required_keys = ['dbname', 'user', 'password', 'host', 'port']if not all(key in db_config for key in required_keys):print("Error: Missing required database configuration in test.json")sys.exit(1)db_config['user'] = userexcept FileNotFoundError:print("Error: test.json not found")sys.exit(1)except json.JSONDecodeError:print("Error: test.json is not a valid JSON file")sys.exit(1)sql_files = list(find_sql_files(search_path))print(f"Found {len(sql_files)} SQL files: {sql_files}")print_colored_text(f'|||||二十秒后开始执行{sql_files},以打印出来的顺序依次执行SQL文件|||||||||||', COLORS['MAGENTA'])failed_files = []executed_statements = []successful_files = []  # 新增: 记录成功的SQL文件if sql_files:for sql_file in sql_files:print_colored_text(f'这个文件将要执行: {sql_file}\n\n', COLORS['YELLOW'])success, statements = execute_sql_file(db_config, sql_file, script_dir)if not success:print(f"Failed to execute SQL file: {sql_file}. Continuing with the next file...")failed_files.append(sql_file)else:executed_statements.extend(statements)successful_files.append(sql_file)  # 添加到成功文件列表# 打印总结信息if failed_files:print_colored_text("\nThe following SQL files failed to execute:", COLORS['RED'])for failed_file in failed_files:print(f"- {failed_file}")else:print_colored_text("\nAll SQL files executed successfully.", COLORS['GREEN'])
#        print_colored_text("\nThe following SQL statements were executed successfully:", COLORS['GREEN'])
#        for stmt in executed_statements:
#            print(f"- {stmt}")print_colored_text("\nThe following SQL files were executed successfully:", COLORS['GREEN'])for successful_file in successful_files:print(f"- {successful_file}")else:print("No SQL files found in the specified path.")if __name__ == "__main__":main()

该python脚本依赖于python3环境,libpq.so,pyscopg库,sqlparse库,这些库什么的都可以离线安装,相关文件都已放到百度网盘内了

通过网盘分享的文件:批量执行SQL语句项目
链接: https://pan.baidu.com/s/1zCAL78hp2-92NdjIHCjM0w?pwd=gkw1 提取码: gkw1 

 python3.zip 文件里都是rpm包,适用于centos7,解压后,怎么安装就不在这多说了
其中,里面的sqlpaser.tar.gz 需要解压到/usr/local/lib/python3.6目录下,如果没有python3.6目录,建立即可

psycopg2.gz这个文件里的内容解压到/usr/lib64/python3.6目录下

解压完毕后,需要执行python3 -V 命令,以激活sqlparse库

最终目录如下所示即可:

[root@centos7 ~]# ls /usr/lib64/python3.6/site-packages/
psycopg2  psycopg2-2.8.6-py3.6.egg-info  __pycache__  README.txt
[root@centos7 ~]# ls /usr/local/lib/python3.6/site-packages/
sqlparse  sqlparse-0.4.4.dist-info

三、

重点代码解析

1、

代码编程结构

本次代码编写仍然是延续上个版本,主要功能封装为方法,在main方法统一集中调用

主要是一个main主方法+4个功能方法,分别是控制台颜色渲染方法,SQL文件搜寻方法,SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法,移动执行失败SQL文件到脚本当前目录方法,整体调用main方法

2、

控制台颜色渲染方法

该方法主要是使用了字典,形参调用形式,例如该方法的调用:

print_colored_text(f"Executed SQL file successfully: {sql_file_path}", COLORS['RED'])

没什么好说的,主要就是字典的应用是难点 

3、

SQL文件搜寻方法

def find_sql_files(path):"""查找指定路径下的所有 .sql 文件,并按文件名中第一个出现的数字升序排序返回列表"""sql_files = [os.path.join(root, file)for root, _, files in os.walk(path)for file in filesif file.endswith('.sql')]def extract_first_number(filename):"""从文件名中提取第一个出现的数字序列并转换为整数,用于排序"""match = re.search(r'\d+', os.path.basename(filename))return int(match.group()) if match else float('inf')  # 如果没有匹配到数字,则排到最后# 使用 sorted 函数并指定 key 参数来实现升序排序,仅依据第一个数字sorted_sql_files = sorted(sql_files, key=extract_first_number)return sorted_sql_files

这一段代码难点主要在方法嵌套,首先迭代寻找指定的路径下所有的SQL文件,然后通过子方法,调用正则表达式重新排序,主要是以数字开始的文件

这里需要注意,搜寻到的SQL文件是返回一个列表

4、

SQL文件调用psyconpg2,sqlparse库逐行执行SQL语句方法

该方法主要是业务逻辑实现,主要逻辑是以单个SQL文件为整体事务,如果某个SQL文件有错误,事务回滚,并调用移动执行失败SQL文件到脚本当前目录方法;如果该SQL文件顺利执行完成,则提交事务

这样做的目的是保护数据库,避免不必要的数据混乱

无论如何,当一个SQL文件执行完毕,都会将数据库连接关闭,游标关闭

编写的时候考虑了一下,还是需要捕获错误并将详细错误信息打印,并收集失败的SQL文件和成功的SQL文件,在main方法内将要调用这两个列表,list

5、

移动执行失败SQL文件到脚本当前目录方法

该方法没什么特别的,主要是调用此方法的形式,在main方法内,这里比较绕,当时编写的时候考虑了很久

6、

main方法

整体逻辑封装都在此方法内

    user = sys.argv[1]search_path = sys.argv[2]script_dir = os.path.dirname(os.path.abspath(__file__))  # 获取脚本所在目录

__file__ 是一个非常有用的内置变量,它帮助你在编写脚本时能够动态地确定文件的位置,而不需要硬编码路径。这对于提高代码的可移植性和维护性非常有帮助。如果你在开发过程中遇到需要根据脚本位置来定位其他文件的情况,说实话这个变量我是差点给忘记掉的

那么,为什么要有user这个变量呢?考虑到很多时候并不是有高权限的数据库账号

其它的就没什么好说的了

四、

数据库信息文件json

有需要实践的同学按实际情况填写此json文件

{"db_config": {"dbname": "postgres","user": "postgres","password": "xxxxx","host": "192.168.xxx.xx","port": "1543"}
}

四、

执行方式和执行结果示例

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.===========这是第二个分隔符===================
执行的文件名称是:
./12323.sql
Moving failed SQL file to /root/12323.sql
Failed to execute SQL file: ./12323.sql. Continuing with the next file...
这个文件将要执行: ./33333.sql执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...The following SQL files failed to execute:
- ./12323.sql
- ./33333.sqlThe following SQL files were executed successfully:

全失败的:

有失败有成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 2 SQL files: ['./12323.sql', './33333.sql']
|||||二十秒后开始执行['./12323.sql', './33333.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sqlExecuted SQL file successfully: ./12323.sql
这个文件将要执行: ./33333.sql执行失败的语句是:
-- 插入部门数据 ;
INSERT INTO 
dept VALUES (10, '战略部', '咸阳')
=========这是第一个分隔符===================
报错详细信息是:
duplicate key value violates unique constraint "dept_pkey"
DETAIL:  Key (deptno)=(10) already exists.===========这是第二个分隔符===================
执行的文件名称是:
./33333.sql
Moving failed SQL file to /root/33333.sql
Failed to execute SQL file: ./33333.sql. Continuing with the next file...The following SQL files failed to execute:
- ./33333.sqlThe following SQL files were executed successfully:
- ./12323.sql

全部成功的情况:

[root@centos7 ~]# python3 test7.py postgres ./
Found 1 SQL files: ['./12323.sql']
|||||二十秒后开始执行['./12323.sql'],以打印出来的顺序依次执行SQL文件|||||||||||
这个文件将要执行: ./12323.sql


Executed SQL file successfully: ./12323.sql

All SQL files executed successfully.

The following SQL files were executed successfully:
- ./12323.sql

 

🆗,这个SQL文件批量执行的小工具就介绍到这了,欢迎各位大拿指正错误!!!!!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com