import configparser
import os
import logging
import threading
import time
import boto3
from ftplib import FTP_TLS
from botocore.exceptions import NoCredentialsError
from concurrent.futures import ThreadPoolExecutor# 配置日志
logging.basicConfig(filename='upload_and_import.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')def download_from_box_ftps(host, user, password, remote_dir, filename, local_dir):local_filename = os.path.join(local_dir, filename)try:with FTP_TLS(host) as ftps:ftps.auth()ftps.login(user=user, passwd=password)print(ftps.prot_p())ftps.cwd(remote_dir)with open(local_filename, 'wb') as fobj:ftps.retrbinary(f'RETR {filename}', fobj.write)logging.info(f"Successfully downloaded {filename} from Box.")return local_filenameexcept Exception as e:logging.error(f"Failed to download {filename} from Box: {e}")return Nonedef upload_file_s3(file_name, bucket, object_name=None):s3_client = boto3.client('s3')if object_name is None:object_name = os.path.basename(file_name)try:response = s3_client.upload_file(file_name, bucket, object_name)logging.info(f"Successfully uploaded {file_name} to S3 as {object_name}.")return Trueexcept FileNotFoundError:logging.error(f"The file {file_name} was not found.")return Falseexcept NoCredentialsError:logging.error("Credentials not available for S3 upload.")return Falsedef import_to_redshift(s3_bucket, s3_key, redshift_connection_info):try:# 构建Redshift COPY命令,这里假设表结构已经存在,且文件格式为gzipcopy_command = f"COPY your_table_name FROM's3://{s3_bucket}/{s3_key}' " \f"CREDENTIALS 'aws_access_key_id={redshift_connection_info['aws_access_key_id']};aws_secret_access_key={redshift_connection_info['aws_secret_access_key']}' " \f"GZIP CSV IGNOREHEADER 1"# 这里需要实际建立Redshift连接并执行命令,假设使用psycopg2import psycopg2conn = psycopg2.connect(host=redshift_connection_info['host'],port=redshift_connection_info['port'],database=redshift_connection_info['database'],user=redshift_connection_info['user'],password=redshift_connection_info['password'])cur = conn.cursor()cur.execute(copy_command)conn.commit()cur.close()conn.close()logging.info(f"Successfully imported {s3_key} from S3 to Redshift.")except Exception as e:logging.error(f"Failed to import {s3_key} from S3 to Redshift: {e}")def process_file(file, box_config, s3_config, redshift_config, local_dir):local_file = download_from_box_ftps(box_config['host'], box_config['user'], box_config['password'],box_config['remote_dir'], file, local_dir)if local_file:upload_success = upload_file_s3(local_file, s3_config['bucket'], os.path.join(s3_config['s3_dir'], file))if upload_success:import_to_redshift(s3_config['bucket'], os.path.join(s3_config['s3_dir'], file), redshift_config)os.remove(local_file)def main():config = configparser.ConfigParser()config.read('config.ini')box_config = dict(config.items('BOX'))s3_config = dict(config.items('S3'))redshift_config = dict(config.items('REDSHIFT'))local_dir = 'temp_downloads'if not os.path.exists(local_dir):os.makedirs(local_dir)with ThreadPoolExecutor() as executor:box_files = []try:with FTP_TLS(box_config['host']) as ftps:ftps.auth()ftps.login(user=box_config['user'], passwd=box_config['password'])ftps.cwd(box_config['remote_dir'])box_files = ftps.nlst()except Exception as e:logging.error(f"Failed to list files in Box directory: {e}")for file in box_files:if file.endswith('.gz'):executor.submit(process_file, file, box_config, s3_config, redshift_config, local_dir)if __name__ == "__main__":main()
配置文件读取:通过 configparser 从 config.ini 文件读取Box、S3和Redshift的配置信息。
日志记录:使用 logging 模块记录运行状态日志,包括下载、上传和导入的成功与失败信息。
异步处理:使用 concurrent.futures 模块的 ThreadPoolExecutor 实现异步处理,每个文件的下载、上传和导入操作在独立的线程中执行。
异常处理:在下载、上传和导入过程中,对可能出现的异常进行捕获和处理,并记录到日志中。
临时目录管理:下载的文件存储在临时目录 temp_downloads 中,上传成功后删除该文件。
config.ini 示例
[BOX]
host = YOUR_BOX_FTP_HOST
user = YOUR_USERNAME
password = YOUR_PASSWORD
remote_dir = /REMOTE_DIRECTORY_ON_BOX/[S3]
bucket = TARGET_S3_BUCKET_NAME
s3_dir = YOUR_S3_DIRECTORY[REDSHIFT]
host = YOUR_REDSHIFT_HOST
port = YOUR_REDSHIFT_PORT
database = YOUR_REDSHIFT_DATABASE
user = YOUR_REDSHIFT_USER
password = YOUR_REDSHIFT_PASSWORD
aws_access_key_id = YOUR_AWS_ACCESS_KEY_ID
aws_secret_access_key = YOUR_AWS_SECRET_ACCESS_KEY