当前位置: 首页 > ai >正文

处理省市区excel数据加工成SQL

原始数据相关内容链接

处理excel数据加工成SQL的脚本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Excel行政区域数据转SQL脚本
- 支持特殊行政单位处理(如省直辖县级行政单位)
- 支持批量处理
"""import pandas as pd
import os
import redef process_excel_to_sql(excel_path, output_sql_path=None):"""将Excel行政区域数据转换为SQL插入语句Args:excel_path: Excel文件路径output_sql_path: 输出SQL文件路径,如果为None则打印到控制台"""try:# 读取Excel文件df = pd.read_excel(excel_path)print(f"读取Excel文件: {excel_path}")print(f"数据行数: {len(df)}")print(f"列名: {list(df.columns)}")# 检查必要的列是否存在required_columns = ['省份', '地级市', '县区', '级别']if not all(col in df.columns for col in required_columns):raise ValueError(f"Excel文件缺少必要的列: {required_columns}")# 获取省份名称province_name = df['省份'].iloc[0]print(f"正在处理: {province_name}")# 生成SQL语句sql_lines = []sql_lines.append("-- " + "=" * 60)sql_lines.append(f"-- {province_name}行政区域数据插入SQL")sql_lines.append("-- 适用表结构:sys_area (id, area_name, parent_id, area_level, sort_order)")sql_lines.append("-- " + "=" * 60)sql_lines.append("")# 设置省级变量province_clean = re.sub(r'[省市区县州盟地]', '', province_name)province_var = f"@{province_clean}_province_id"sql_lines.append("-- 设置变量")sql_lines.append(f"SET {province_var} = NULL;")# 处理市级数据 - 分离正常地级市和省直辖县级行政单位normal_cities = df[(df['级别'] == '地级市') & (df['地级市'] != '省直辖县级行政单位')]special_admin_units = df[(df['地级市'] == '省直辖县级行政单位') &(df['县区'].notna()) & (df['县区'] != '')]# 收集所有城市名称,为每个城市创建变量all_city_names = []if not normal_cities.empty:all_city_names.extend(normal_cities['地级市'].tolist())if not special_admin_units.empty:all_city_names.extend(special_admin_units['县区'].tolist())# 为每个城市生成变量名city_vars = {}for city_name in all_city_names:# 生成变量名,去除特殊字符var_clean = re.sub(r'[市区县州盟地族苗土家自治]', '', city_name)var_name = f"@{var_clean}_city_id"city_vars[city_name] = var_namesql_lines.append(f"SET {var_name} = NULL;")# 直辖市的情况is_municipality = len(all_city_names) == 1 and special_admin_units.emptyif is_municipality:city_var = f"@{province_clean}_city_id"sql_lines.append(f"SET {city_var} = NULL;")sql_lines.append("")# 1. 插入省级sql_lines.append("-- 1. 插入省级")sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{province_name}', 0, 1, 1);")sql_lines.append(f"SET {province_var} = LAST_INSERT_ID();")sql_lines.append("")# 2. 处理地级市all_cities_count = len(all_city_names)if all_cities_count > 0:sql_lines.append("-- 2. 插入地级市")city_sort = 1# 插入正常地级市if not normal_cities.empty:sql_lines.append("-- 2.1 正常地级市")for _, city_row in normal_cities.iterrows():city_name = city_row['地级市']city_var_name = city_vars[city_name]sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{city_name}', {province_var}, 2, {city_sort});")sql_lines.append(f"SET {city_var_name} = LAST_INSERT_ID();")# 如果是直辖市,同时设置通用变量if is_municipality:sql_lines.append(f"SET {city_var} = LAST_INSERT_ID();")city_sort += 1sql_lines.append("")# 处理省直辖县级行政单位,将其作为地级市if not special_admin_units.empty:sql_lines.append("-- 2.2 省直辖县级行政单位(作为地级市处理)")for _, special_row in special_admin_units.iterrows():special_city_name = special_row['县区']  # 县区列作为市名city_var_name = city_vars[special_city_name]sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{special_city_name}', {province_var}, 2, {city_sort});")sql_lines.append(f"SET {city_var_name} = LAST_INSERT_ID();")city_sort += 1sql_lines.append("")print(f"发现省直辖县级行政单位 {len(special_admin_units)} 个,已作为地级市处理")sql_lines.append(f"-- 总计地级市数量: {all_cities_count}个")sql_lines.append("")# 3. 处理区县# 正常区县(不包括省直辖县级行政单位下的)districts = df[(df['级别'] == '区县') & (df['地级市'] != '省直辖县级行政单位')]if not districts.empty:sql_lines.append("-- 3. 插入区县")if is_municipality:# 直辖市情况sql_lines.append("-- 直辖市区县")district_sort = 1for _, district_row in districts.iterrows():district_name = district_row['县区']sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{district_name}', {city_var}, 3, {district_sort});")district_sort += 1else:# 普通省份,需要按城市分组处理区县,使用变量避免子查询问题grouped = districts.groupby('地级市')for city_name, city_districts in grouped:if city_name != '省直辖县级行政单位' and city_name in city_vars:sql_lines.append(f"-- {city_name}下的区县")city_var_name = city_vars[city_name]district_sort = 1for _, district_row in city_districts.iterrows():district_name = district_row['县区']sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{district_name}', {city_var_name}, 3, {district_sort});")district_sort += 1sql_lines.append("")sql_lines.append("")# 统计信息province_count = 1city_count = all_cities_countdistrict_count = len(districts)sql_lines.append("")sql_lines.append("-- " + "=" * 60)sql_lines.append(f"-- 统计:省级{province_count}个,地级市{city_count}个,区县{district_count}个")if not special_admin_units.empty:sql_lines.append(f"-- 其中省直辖县级行政单位{len(special_admin_units)}个已作为地级市处理")sql_lines.append("-- 修复:使用变量替代子查询,避免MySQL错误")sql_lines.append("-- " + "=" * 60)# 输出结果sql_content = "\n".join(sql_lines)if output_sql_path:try:# 确保目录存在output_dir = os.path.dirname(output_sql_path)if output_dir and not os.path.exists(output_dir):os.makedirs(output_dir, exist_ok=True)print(f"创建目录: {output_dir}")# 写入文件with open(output_sql_path, 'w', encoding='utf-8') as f:f.write(sql_content)print(f"✓ SQL文件已保存到: {output_sql_path}")except PermissionError:print(f"❌ 权限错误:无法写入文件 {output_sql_path}")print("可能的解决方案:")print("1. 检查目录是否存在写权限")print("2. 尝试以管理员身份运行")print("3. 选择其他目录保存文件")print("4. 直接复制下面的SQL内容:")print("-" * 40)print(sql_content)print("-" * 40)except Exception as e:print(f"❌ 保存文件时出错: {e}")print("将在控制台显示SQL内容:")print("-" * 40)print(sql_content)print("-" * 40)else:print("\n生成的SQL语句:")print("-" * 60)print(sql_content)print("-" * 60)return sql_contentexcept Exception as e:print(f"❌ 处理Excel文件时出错: {e}")return Nonedef batch_process_excel_files(folder_path):"""批量处理文件夹中的所有Excel文件Args:folder_path: 包含Excel文件的文件夹路径"""if not os.path.exists(folder_path):print(f"❌ 错误:文件夹 {folder_path} 不存在")returnif not os.path.isdir(folder_path):print(f"❌ 错误:{folder_path} 不是一个目录")returnexcel_files = [f for f in os.listdir(folder_path) if f.endswith(('.xlsx', '.xls'))]if not excel_files:print("❌ 文件夹中没有找到Excel文件")returnprint(f"✓ 找到{len(excel_files)}个Excel文件:")for file in excel_files:print(f"  - {file}")# 创建sql子目录sql_folder = os.path.join(folder_path, 'sql')try:if not os.path.exists(sql_folder):os.makedirs(sql_folder, exist_ok=True)print(f"✓ 创建SQL输出目录: {sql_folder}")except PermissionError:print(f"⚠️ 无法创建SQL目录 {sql_folder},将保存到原目录")sql_folder = folder_pathexcept Exception as e:print(f"⚠️ 创建目录时出错: {e},将保存到原目录")sql_folder = folder_pathsuccess_count = 0fail_count = 0for excel_file in excel_files:excel_path = os.path.join(folder_path, excel_file)sql_file = excel_file.replace('.xlsx', '.sql').replace('.xls', '.sql')sql_path = os.path.join(sql_folder, sql_file)print(f"\n{'=' * 50}")print(f"处理文件: {excel_file}")print(f"{'=' * 50}")try:result = process_excel_to_sql(excel_path, sql_path)if result:print(f"✓ 成功处理: {excel_file}")success_count += 1else:print(f"❌ 处理失败: {excel_file}")fail_count += 1except Exception as e:print(f"❌ 处理失败: {excel_file} - {e}")fail_count += 1# 尝试只输出到控制台try:print("尝试只在控制台显示结果:")result = process_excel_to_sql(excel_path, None)if result:print("✓ 控制台显示成功")except Exception as e2:print(f"❌ 完全失败: {e2}")print(f"\n{'=' * 50}")print(f"批量处理完成")print(f"成功: {success_count} 个")print(f"失败: {fail_count} 个")print(f"{'=' * 50}")def validate_file_path(file_path):"""验证文件路径"""if not file_path:return False, "文件路径不能为空"# 去除可能的引号file_path = file_path.strip('"').strip("'")if not os.path.exists(file_path):return False, f"文件不存在: {file_path}"if not file_path.endswith(('.xlsx', '.xls')):return False, f"文件格式不正确,需要.xlsx或.xls文件: {file_path}"return True, file_pathdef validate_folder_path(folder_path):"""验证文件夹路径"""if not folder_path:folder_path = "./"# 去除可能的引号folder_path = folder_path.strip('"').strip("'")if not os.path.exists(folder_path):return False, f"目录不存在: {folder_path}"if not os.path.isdir(folder_path):return False, f"路径不是目录: {folder_path}"return True, folder_pathif __name__ == "__main__":print("=" * 60)print("Excel行政区域数据转SQL工具 v2.0")print("✅ 支持特殊行政单位处理(如省直辖县级行政单位)")print("✅ 修复MySQL子查询错误,使用变量替代")print("✅ 完善的权限错误处理")print("=" * 60)print()while True:print("选择处理方式:")print("1. 处理单个Excel文件")print("2. 批量处理文件夹中的所有Excel文件")print("3. 查看使用说明")print("4. 退出")choice = input("\n请选择 (1/2/3/4): ").strip()if choice == "1":print("\n" + "-" * 40)print("单文件处理模式")print("-" * 40)while True:file_path = input("请输入Excel文件路径: ").strip()is_valid, result = validate_file_path(file_path)if is_valid:file_path = resultbreakelse:print(f"❌ {result}")retry = input("是否重新输入? (y/n): ").strip().lower()if retry != 'y':breakif 'file_path' in locals() and os.path.exists(file_path):save_to_file = input("是否保存到SQL文件? (y/n): ").strip().lower()if save_to_file == 'y':sql_path = input("请输入SQL文件保存路径 (直接回车使用默认路径): ").strip()if not sql_path:base_name = os.path.splitext(file_path)[0]sql_path = base_name + '.sql'print(f"使用默认路径: {sql_path}")try:process_excel_to_sql(file_path, sql_path)except Exception as e:print(f"❌ 处理失败: {e}")show_console = input("是否在控制台显示结果? (y/n): ").strip().lower()if show_console == 'y':try:process_excel_to_sql(file_path, None)except Exception as e2:print(f"❌ 完全失败: {e2}")else:try:process_excel_to_sql(file_path, None)except Exception as e:print(f"❌ 处理失败: {e}")elif choice == "2":print("\n" + "-" * 40)print("批量处理模式")print("-" * 40)while True:folder_path = input("请输入文件夹路径 (直接回车使用当前目录): ").strip()is_valid, result = validate_folder_path(folder_path)if is_valid:folder_path = resultprint(f"使用目录: {os.path.abspath(folder_path)}")breakelse:print(f"❌ {result}")retry = input("是否重新输入? (y/n): ").strip().lower()if retry != 'y':breakif 'folder_path' in locals() and os.path.exists(folder_path):batch_process_excel_files(folder_path)elif choice == "3":print("\n" + "=" * 60)print("使用说明")print("=" * 60)print("1. Excel文件格式要求:")print("   - 必须包含列:省份、地级市、县区、级别")print("   - 支持.xlsx和.xls格式")print()print("2. 特殊处理说明:")print("   - 省直辖县级行政单位会被自动识别")print("   - 仙桃市、潜江市等会作为地级市处理")print("   - 直辖市会自动识别并正确处理")print()print("3. 生成的SQL特点:")print("   - 使用MySQL变量,避免子查询错误")print("   - 支持完整的三级行政区域结构")print("   - 包含详细的注释和统计信息")print()print("4. 常见问题:")print("   - 权限错误:尝试以管理员身份运行或选择其他目录")print("   - 文件格式错误:确保Excel文件包含必要的列")print("   - MySQL错误:新版本已修复子查询问题")print("=" * 60)elif choice == "4":print("\n感谢使用Excel转SQL工具!")breakelse:print("❌ 无效的选择,请输入 1、2、3 或 4")print()  # 添加空行,便于下次选择
http://www.xdnf.cn/news/19851.html

相关文章:

  • 常用的几种测试工具:selenium,jmeter,jenkins
  • 【IO】进程间通信(IPC)练习
  • Unity 的游戏循环机制
  • 66车载诊断架构 --- 从架构系统角度怎么确保整车DTC的完整性?
  • (二)文件管理-基础命令-pwd命令的使用
  • 计算机视觉(六):腐蚀操作
  • 电脑城老板不会告诉你的装机秘籍:建造者模式让你的代码高配起飞!
  • 基于深度学习的医疗器械生产备案凭证识别技术,实现从图像到结构化数据的智能转化
  • pytorch初级
  • 八、算法设计与分析
  • 新后端漏洞(上)- Python unpickle 造成任意命令执行漏洞
  • 惠普HP Color LaserJet Pro MFP M277dw打印有横条维修案例1
  • RoPE位置编码缩放因子的最优解:频率维度与位置敏感度的精妙权衡
  • SpringBoot项目package报错 PKIX path building failed 终极解决方案:Nexus私服证书导入JDK证书库
  • C++对象构造与析构
  • 2.插值法
  • Spring Boot 实现数据库表变更监听的 Redis 消息队列方案
  • 技术方案之Mysql部署架构
  • uni app 的app 端调用tts 进行文字转语音
  • GDAL 下载安装
  • C题目训练【三连击】
  • Vue3 + Ant Design Vue 实现多选下拉组件(支持分组、搜索与标签省略)
  • Ollama大模型 本地部署+使用教程
  • 【FastDDS】Layer DDS之Domain ( 05-Creating a DomainParticipant)
  • lesson53:CSS五种定位方式全解析:从基础到实战应用
  • GEO服务商推荐:移山科技以划时代高精尖技术引领AI搜索优化新纪元
  • C++ 5
  • 使用 Acme.sh 获取和管理免费 SSL 证书
  • 性能测试-jmeter8-脚本录制
  • 网络通信与协议栈 -- TCP协议与编程