df = pd.DataFrame(result.fetchall(), columns=result.keys()) 这样只有一列,而且每行都是一个迭代元素
使用 # 显式指定要选择的列 query = db.session.query(model).with_entities(*model.__table__.columns)python导出csv文件,乱码问题
# 将 DataFrame 导出为 CSV 格式df.to_csv(output, encoding='utf-8-sig', index=False) # 使用 'utf-8-sig' 包含 BOM
@app.route('/data_export/',methods=('GET','POST'))
def data_export():# 处理导出数据的逻辑data_type = request.args.get('type', 'district_price')if data_type in table_models:model = table_models[data_type]# query = model.query# 显式指定要选择的列query = db.session.query(model).with_entities(*model.__table__.columns)# df = pd.read_sql(str(query.statement), db.session.bind)result = db.session.execute(query)column_names = result.keys()print(f"Column names: {column_names}") # 调试输出列名# 获取所有行rows = result.fetchall()print(f"Rows: {rows[:5]}") # 调试输出前5行数据# df = pd.DataFrame(result.fetchall(), columns=result.keys())df = pd.DataFrame(rows, columns=column_names)# 将结果转换为 Pandas DataFrame# 使用 io.BytesIO 来创建一个内存中的文件对象output = io.BytesIO()# 将 DataFrame 导出为 CSV 格式df.to_csv(output,encoding='utf-8-sig', index=False)# print(output.getvalue().decode('utf-8')) # 检查输出的 CSV 文件内容output.seek(0)# print(output.getvalue().decode('utf-8')) # 检查输出的 CSV 文件内容return send_file(output,as_attachment=True,download_name=f'{data_type}_export.csv',mimetype='text/csv')else:# flash('无效的数据类型', 'danger')return redirect(url_for('manage'))