当前位置: 首页 > news >正文

Python实现从Parquet文件生成Redshift表并存储SQL语句

Python实现解析存储Parquet文件的多个本地路径,获取其中的数据Schema,再根据这些Schema,得到创建上传数据到Amazon Redshift数据库表的SQL语句,并在Amazon Redshift里创建对应的表,并存储创建表的SQL语句,以便将Parquet文件上传到S3,并利用COPY命令导入Amazon Redshift数据库表。

import pandas as pd
import pyarrow.parquet as pq
import psycopg2
import boto3
from botocore.exceptions import ClientError
import json
from typing import List, Dict, Any
import osclass ParquetToRedshift:def __init__(self, db_config: Dict[str, Any], s3_config: Dict[str, Any]):"""初始化Redshift和S3配置Args:db_config: Redshift数据库连接配置s3_config: S3存储配置"""self.db_config = db_configself.s3_config = s3_configself.s3_client = boto3.client('s3',aws_access_key_id=s3_config.get('aws_access_key_id'),aws_secret_access_key=s3_config.get('aws_secret_access_key'),region_name=s3_config.get('region_name'))def get_parquet_schema(self, parquet_paths: List[str]) -> Dict[str, str]:"""从多个Parquet文件中提取合并的Schema信息Args:parquet_paths: Parquet文件路径列表Returns:字段名到数据类型的映射字典"""merged_schema = {}for path in parquet_paths:if not os.path.exists(path):print(f"警告: 文件 {path} 不存在,跳过")continuetry:table = pq.read_table(path)schema = table.schemafor field in schema:field_name = field.namefield_type = str(field.type)# 如果字段已存在但类型不同,需要处理类型冲突if field_name in merged_schema and merged_schema[field_name] != field_type:print(f"警告: 字段 {field_name} 类型冲突: {merged_schema[field_name]} vs {field_type}")# 这里可以选择更复杂的类型解析策略,如选择更通用的类型merged_schema[field_name] = field_typeexcept Exception as e:print(f"处理文件 {path} 时出错: {str(e)}")continuereturn merged_schemadef map_parquet_type_to_redshift(self, parquet_type: str) -> str:"""将Parquet数据类型映射到Redshift数据类型Args:parquet_type: Parquet数据类型字符串Returns:Redshift数据类型字符串"""type_mapping = {'int8': 'SMALLINT','int16': 'SMALLINT','int32': 'INTEGER','int64': 'BIGINT','uint8': 'SMALLINT','uint16': 'INTEGER','uint32': 'BIGINT','uint64': 'BIGINT',  # Redshift没有无符号类型,需要小心处理'float': 'REAL','double': 'DOUBLE PRECISION','bool': 'BOOLEAN','string': 'VARCHAR(256)',  # 可根据需要调整长度'timestamp[ms]': 'TIMESTAMP','timestamp[us]': 'TIMESTAMP','timestamp[ns]': 'TIMESTAMP','date32[day]': 'DATE','binary': 'VARBYTE(256)',  # 可根据需要调整长度}# 处理可空类型if parquet_type.startswith('null'):return 'VARCHAR(256)'  # 默认处理为字符串# 查找映射for key, value in type_mapping.items():if key in parquet_type.lower():return value# 默认处理为字符串print(f"警告: 未映射的类型 {parquet_type},使用VARCHAR(256)作为默认")return 'VARCHAR(256)'def generate_create_table_sql(self, schema: Dict[str, str], table_name: str, distribution_key: str = None, sort_keys: List[str] = None) -> str:"""生成Redshift建表SQL语句Args:schema: 字段名到数据类型的映射table_name: 目标表名distribution_key: 分布键(可选)sort_keys: 排序键列表(可选)Returns:CREATE TABLE SQL语句"""if not schema:raise ValueError("Schema不能为空")# 构建列定义columns = []for field_name, parquet_type in schema.items():redshift_type = self.map_parquet_type_to_redshift(parquet_type)columns.append(f'"{field_name}" {redshift_type}')# 构建基本CREATE TABLE语句sql = f'CREATE TABLE IF NOT EXISTS {table_name} (\n'sql += ',\n'.join(columns)sql += '\n)'# 添加分布样式if distribution_key:sql += f'\nDISTKEY({distribution_key})'# 添加排序键if sort_keys:sql += f'\nSORTKEY({", ".join(sort_keys)})'return sqldef execute_sql(self, sql: str) -> bool:"""在Redshift中执行SQL语句Args:sql: 要执行的SQL语句Returns:执行是否成功"""try:conn = psycopg2.connect(dbname=self.db_config['dbname'],user=self.db_config['user'],password=self.db_config['password'],host=self.db_config['host'],port=self.db_config['port'])cursor = conn.cursor()cursor.execute(sql)conn.commit()cursor.close()conn.close()return Trueexcept Exception as e:print(f"执行SQL时出错: {str(e)}")return Falsedef upload_to_s3(self, local_path: str, s3_key: str) -> bool:"""上传文件到S3Args:local_path: 本地文件路径s3_key: S3对象键Returns:上传是否成功"""try:self.s3_client.upload_file(local_path, self.s3_config['bucket_name'], s3_key)return Trueexcept ClientError as e:print(f"上传到S3时出错: {str(e)}")return Falsedef generate_copy_sql(self, table_name: str, s3_path: str, iam_role: str = None) -> str:"""生成Redshift COPY命令Args:table_name: 目标表名s3_path: S3文件路径iam_role: Redshift有权限访问S3的IAM角色(可选,如果使用凭据则不需要)Returns:COPY SQL语句"""if iam_role:credentials = f"IAM_ROLE '{iam_role}'"else:credentials = f"CREDENTIALS 'aws_access_key_id={self.s3_config["aws_access_key_id"]};aws_secret_access_key={self.s3_config["aws_secret_access_key"]}'"return f"""COPY {table_name}FROM '{s3_path}'{credentials}FORMAT AS PARQUET"""def save_sql_to_file(self, sql: str, file_path: str):"""将SQL语句保存到文件Args:sql: SQL语句file_path: 文件保存路径"""with open(file_path, 'w') as f:f.write(sql)def process_parquet_to_redshift(self, parquet_paths: List[str], table_name: str, sql_save_path: str = None, distribution_key: str = None,sort_keys: List[str] = None):"""处理Parquet文件到Redshift的完整流程Args:parquet_paths: Parquet文件路径列表table_name: 目标表名sql_save_path: SQL保存路径(可选)distribution_key: 分布键(可选)sort_keys: 排序键列表(可选)Returns:处理是否成功"""# 1. 获取Schemaprint("正在提取Parquet文件Schema...")schema = self.get_parquet_schema(parquet_paths)if not schema:print("错误: 无法从Parquet文件中提取Schema")return False# 2. 生成建表SQLprint("正在生成Redshift建表SQL...")create_sql = self.generate_create_table_sql(schema, table_name, distribution_key, sort_keys)print(f"生成的SQL:\n{create_sql}")# 3. 在Redshift中创建表print("正在Redshift中创建表...")if not self.execute_sql(create_sql):print("错误: 创建表失败")return False# 4. 保存SQL到文件(可选)if sql_save_path:self.save_sql_to_file(create_sql, sql_save_path)print(f"SQL已保存到: {sql_save_path}")# 5. 上传Parquet文件到S3(这里只上传第一个文件作为示例)if parquet_paths and self.s3_config.get('bucket_name'):print("正在上传Parquet文件到S3...")s3_key = f"parquet_data/{table_name}/{os.path.basename(parquet_paths[0])}"if self.upload_to_s3(parquet_paths[0], s3_key):s3_path = f"s3://{self.s3_config['bucket_name']}/{s3_key}"# 6. 生成COPY命令copy_sql = self.generate_copy_sql(table_name, s3_path, self.s3_config.get('iam_role'))print(f"COPY命令:\n{copy_sql}")# 保存COPY命令(可选)if sql_save_path:copy_save_path = sql_save_path.replace('.sql', '_copy.sql')self.save_sql_to_file(copy_sql, copy_save_path)print(f"COPY SQL已保存到: {copy_save_path}")else:print("警告: 上传文件到S3失败")print("处理完成!")return True# 使用示例
if __name__ == "__main__":# 配置参数db_config = {'dbname': 'your_redshift_db','user': 'your_username','password': 'your_password','host': 'your_redshift_cluster.region.redshift.amazonaws.com','port': '5439'}s3_config = {'bucket_name': 'your-bucket-name','aws_access_key_id': 'your_access_key','aws_secret_access_key': 'your_secret_key','region_name': 'us-east-1','iam_role': 'arn:aws:iam::account-id:role/your-redshift-role'  # 可选}# Parquet文件路径parquet_paths = ['/path/to/your/file1.parquet','/path/to/your/file2.parquet']# 目标表名table_name = 'your_table_name'# SQL保存路径sql_save_path = '/path/to/save/create_table.sql'# 创建处理器实例processor = ParquetToRedshift(db_config, s3_config)# 执行处理流程processor.process_parquet_to_redshift(parquet_paths=parquet_paths,table_name=table_name,sql_save_path=sql_save_path,distribution_key='your_distribution_key',  # 可选sort_keys=['your_sort_key1', 'your_sort_key2']  # 可选)

使用说明

  1. 安装依赖

    pip install pandas pyarrow psycopg2-binary boto3
    
  2. 配置参数

    • 设置正确的Redshift数据库连接信息
    • 配置S3存储桶和凭据信息
    • 指定要处理的Parquet文件路径
  3. 运行脚本

    python parquet_to_redshift.py
    

功能说明

  1. Schema提取:从多个Parquet文件中提取合并的Schema信息
  2. 类型映射:将Parquet数据类型映射到Redshift数据类型
  3. SQL生成:生成包含分布键和排序键的Redshift建表语句
  4. 表创建:在Redshift中执行建表语句
  5. SQL保存:将生成的SQL语句保存到本地文件
  6. 文件上传:将Parquet文件上传到S3(可选)
  7. COPY命令生成:生成用于将数据从S3导入Redshift的COPY命令

注意事项

  1. 确保Redshift集群和S3存储桶在同一个AWS区域以减少数据传输成本
  2. 根据实际数据特点调整类型映射逻辑
  3. 考虑添加错误处理和重试机制以提高鲁棒性
  4. 对于大型数据集,考虑使用Redshift的自动压缩分析(COMPUPDATE)功能
http://www.xdnf.cn/news/1347103.html

相关文章:

  • Eigen 中Sparse 模块的简单介绍和实战使用示例
  • (纯新手教学)计算机视觉(opencv)实战八——四种边缘检测详解:Sobel、Scharr、Laplacian、Canny
  • Day11 数据统计 图形报表
  • RKLLM 模型转换从0开始
  • vagrant怎么在宿主机操作虚拟机里面的系统管理和软件安装
  • 2025软件供应链安全技术路线未来趋势预测
  • vim的使用
  • Retrieval-Augmented Generation(RAG)
  • 为什么访问HTTPS站点时,会发生SSL证书错误
  • Trie 树(字典树)
  • 8月22号打卡
  • FFmpeg及 RTSP、RTMP
  • GitGithub相关(自用,持续更新update 8/23)
  • 文件下载和文件上传漏洞
  • LeetCode第1695题 - 删除子数组的最大得分
  • CSS自定义属性(CSS变量)
  • Jenkins发布spring项目踩坑——nohup java -jar发布后显示成功,但实际jps查询并未运行
  • kubernetes中pod的管理及优化
  • Python打卡Day49 CBAM注意力
  • Apache Ozone 2.0.0集群部署
  • 微信原生下载互联网oss资源保存到本地
  • CCleaner v1.2.3.4 中文解锁注册版,系统优化,隐私保护,极速清理
  • Unreal Engine Class System
  • 图数据库(neo4j)基础: 分类/标签 节点 关系 属性
  • 蓝牙部分解析和代码建构
  • set_disable_timing应用举例
  • OpenCV 图像边缘检测
  • 从“配置化思维”到“前端效率革命”:xiangjsoncraft 如何用 JSON 简化页面开发?
  • k8s 简介及部署方法以及各方面应用
  • 子类(派生类)使用父类(基类)的成员