Python实现从Parquet文件生成Redshift表并存储SQL语句
Python实现解析存储Parquet文件的多个本地路径,获取其中的数据Schema,再根据这些Schema,得到创建上传数据到Amazon Redshift数据库表的SQL语句,并在Amazon Redshift里创建对应的表,并存储创建表的SQL语句,以便将Parquet文件上传到S3,并利用COPY命令导入Amazon Redshift数据库表。
import pandas as pd
import pyarrow.parquet as pq
import psycopg2
import boto3
from botocore.exceptions import ClientError
import json
from typing import List, Dict, Any
import osclass ParquetToRedshift:def __init__(self, db_config: Dict[str, Any], s3_config: Dict[str, Any]):"""初始化Redshift和S3配置Args:db_config: Redshift数据库连接配置s3_config: S3存储配置"""self.db_config = db_configself.s3_config = s3_configself.s3_client = boto3.client('s3',aws_access_key_id=s3_config.get('aws_access_key_id'),aws_secret_access_key=s3_config.get('aws_secret_access_key'),region_name=s3_config.get('region_name'))def get_parquet_schema(self, parquet_paths: List[str]) -> Dict[str, str]:"""从多个Parquet文件中提取合并的Schema信息Args:parquet_paths: Parquet文件路径列表Returns:字段名到数据类型的映射字典"""merged_schema = {}for path in parquet_paths:if not os.path.exists(path):print(f"警告: 文件 {path} 不存在,跳过")continuetry:table = pq.read_table(path)schema = table.schemafor field in schema:field_name = field.namefield_type = str(field.type)# 如果字段已存在但类型不同,需要处理类型冲突if field_name in merged_schema and merged_schema[field_name] != field_type:print(f"警告: 字段 {field_name} 类型冲突: {merged_schema[field_name]} vs {field_type}")# 这里可以选择更复杂的类型解析策略,如选择更通用的类型merged_schema[field_name] = field_typeexcept Exception as e:print(f"处理文件 {path} 时出错: {str(e)}")continuereturn merged_schemadef map_parquet_type_to_redshift(self, parquet_type: str) -> str:"""将Parquet数据类型映射到Redshift数据类型Args:parquet_type: Parquet数据类型字符串Returns:Redshift数据类型字符串"""type_mapping = {'int8': 'SMALLINT','int16': 'SMALLINT','int32': 'INTEGER','int64': 'BIGINT','uint8': 'SMALLINT','uint16': 'INTEGER','uint32': 'BIGINT','uint64': 'BIGINT', # Redshift没有无符号类型,需要小心处理'float': 'REAL','double': 'DOUBLE PRECISION','bool': 'BOOLEAN','string': 'VARCHAR(256)', # 可根据需要调整长度'timestamp[ms]': 'TIMESTAMP','timestamp[us]': 'TIMESTAMP','timestamp[ns]': 'TIMESTAMP','date32[day]': 'DATE','binary': 'VARBYTE(256)', # 可根据需要调整长度}# 处理可空类型if parquet_type.startswith('null'):return 'VARCHAR(256)' # 默认处理为字符串# 查找映射for key, value in type_mapping.items():if key in parquet_type.lower():return value# 默认处理为字符串print(f"警告: 未映射的类型 {parquet_type},使用VARCHAR(256)作为默认")return 'VARCHAR(256)'def generate_create_table_sql(self, schema: Dict[str, str], table_name: str, distribution_key: str = None, sort_keys: List[str] = None) -> str:"""生成Redshift建表SQL语句Args:schema: 字段名到数据类型的映射table_name: 目标表名distribution_key: 分布键(可选)sort_keys: 排序键列表(可选)Returns:CREATE TABLE SQL语句"""if not schema:raise ValueError("Schema不能为空")# 构建列定义columns = []for field_name, parquet_type in schema.items():redshift_type = self.map_parquet_type_to_redshift(parquet_type)columns.append(f'"{field_name}" {redshift_type}')# 构建基本CREATE TABLE语句sql = f'CREATE TABLE IF NOT EXISTS {table_name} (\n'sql += ',\n'.join(columns)sql += '\n)'# 添加分布样式if distribution_key:sql += f'\nDISTKEY({distribution_key})'# 添加排序键if sort_keys:sql += f'\nSORTKEY({", ".join(sort_keys)})'return sqldef execute_sql(self, sql: str) -> bool:"""在Redshift中执行SQL语句Args:sql: 要执行的SQL语句Returns:执行是否成功"""try:conn = psycopg2.connect(dbname=self.db_config['dbname'],user=self.db_config['user'],password=self.db_config['password'],host=self.db_config['host'],port=self.db_config['port'])cursor = conn.cursor()cursor.execute(sql)conn.commit()cursor.close()conn.close()return Trueexcept Exception as e:print(f"执行SQL时出错: {str(e)}")return Falsedef upload_to_s3(self, local_path: str, s3_key: str) -> bool:"""上传文件到S3Args:local_path: 本地文件路径s3_key: S3对象键Returns:上传是否成功"""try:self.s3_client.upload_file(local_path, self.s3_config['bucket_name'], s3_key)return Trueexcept ClientError as e:print(f"上传到S3时出错: {str(e)}")return Falsedef generate_copy_sql(self, table_name: str, s3_path: str, iam_role: str = None) -> str:"""生成Redshift COPY命令Args:table_name: 目标表名s3_path: S3文件路径iam_role: Redshift有权限访问S3的IAM角色(可选,如果使用凭据则不需要)Returns:COPY SQL语句"""if iam_role:credentials = f"IAM_ROLE '{iam_role}'"else:credentials = f"CREDENTIALS 'aws_access_key_id={self.s3_config["aws_access_key_id"]};aws_secret_access_key={self.s3_config["aws_secret_access_key"]}'"return f"""COPY {table_name}FROM '{s3_path}'{credentials}FORMAT AS PARQUET"""def save_sql_to_file(self, sql: str, file_path: str):"""将SQL语句保存到文件Args:sql: SQL语句file_path: 文件保存路径"""with open(file_path, 'w') as f:f.write(sql)def process_parquet_to_redshift(self, parquet_paths: List[str], table_name: str, sql_save_path: str = None, distribution_key: str = None,sort_keys: List[str] = None):"""处理Parquet文件到Redshift的完整流程Args:parquet_paths: Parquet文件路径列表table_name: 目标表名sql_save_path: SQL保存路径(可选)distribution_key: 分布键(可选)sort_keys: 排序键列表(可选)Returns:处理是否成功"""# 1. 获取Schemaprint("正在提取Parquet文件Schema...")schema = self.get_parquet_schema(parquet_paths)if not schema:print("错误: 无法从Parquet文件中提取Schema")return False# 2. 生成建表SQLprint("正在生成Redshift建表SQL...")create_sql = self.generate_create_table_sql(schema, table_name, distribution_key, sort_keys)print(f"生成的SQL:\n{create_sql}")# 3. 在Redshift中创建表print("正在Redshift中创建表...")if not self.execute_sql(create_sql):print("错误: 创建表失败")return False# 4. 保存SQL到文件(可选)if sql_save_path:self.save_sql_to_file(create_sql, sql_save_path)print(f"SQL已保存到: {sql_save_path}")# 5. 上传Parquet文件到S3(这里只上传第一个文件作为示例)if parquet_paths and self.s3_config.get('bucket_name'):print("正在上传Parquet文件到S3...")s3_key = f"parquet_data/{table_name}/{os.path.basename(parquet_paths[0])}"if self.upload_to_s3(parquet_paths[0], s3_key):s3_path = f"s3://{self.s3_config['bucket_name']}/{s3_key}"# 6. 生成COPY命令copy_sql = self.generate_copy_sql(table_name, s3_path, self.s3_config.get('iam_role'))print(f"COPY命令:\n{copy_sql}")# 保存COPY命令(可选)if sql_save_path:copy_save_path = sql_save_path.replace('.sql', '_copy.sql')self.save_sql_to_file(copy_sql, copy_save_path)print(f"COPY SQL已保存到: {copy_save_path}")else:print("警告: 上传文件到S3失败")print("处理完成!")return True# 使用示例
if __name__ == "__main__":# 配置参数db_config = {'dbname': 'your_redshift_db','user': 'your_username','password': 'your_password','host': 'your_redshift_cluster.region.redshift.amazonaws.com','port': '5439'}s3_config = {'bucket_name': 'your-bucket-name','aws_access_key_id': 'your_access_key','aws_secret_access_key': 'your_secret_key','region_name': 'us-east-1','iam_role': 'arn:aws:iam::account-id:role/your-redshift-role' # 可选}# Parquet文件路径parquet_paths = ['/path/to/your/file1.parquet','/path/to/your/file2.parquet']# 目标表名table_name = 'your_table_name'# SQL保存路径sql_save_path = '/path/to/save/create_table.sql'# 创建处理器实例processor = ParquetToRedshift(db_config, s3_config)# 执行处理流程processor.process_parquet_to_redshift(parquet_paths=parquet_paths,table_name=table_name,sql_save_path=sql_save_path,distribution_key='your_distribution_key', # 可选sort_keys=['your_sort_key1', 'your_sort_key2'] # 可选)
使用说明
-
安装依赖:
pip install pandas pyarrow psycopg2-binary boto3
-
配置参数:
- 设置正确的Redshift数据库连接信息
- 配置S3存储桶和凭据信息
- 指定要处理的Parquet文件路径
-
运行脚本:
python parquet_to_redshift.py
功能说明
- Schema提取:从多个Parquet文件中提取合并的Schema信息
- 类型映射:将Parquet数据类型映射到Redshift数据类型
- SQL生成:生成包含分布键和排序键的Redshift建表语句
- 表创建:在Redshift中执行建表语句
- SQL保存:将生成的SQL语句保存到本地文件
- 文件上传:将Parquet文件上传到S3(可选)
- COPY命令生成:生成用于将数据从S3导入Redshift的COPY命令
注意事项
- 确保Redshift集群和S3存储桶在同一个AWS区域以减少数据传输成本
- 根据实际数据特点调整类型映射逻辑
- 考虑添加错误处理和重试机制以提高鲁棒性
- 对于大型数据集,考虑使用Redshift的自动压缩分析(COMPUPDATE)功能