【python实战】-- 选择解压汇总mode进行数据汇总20250525更新(篇幅2)
系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、数据处理功能集合下
- 总结
前言
一、数据处理功能集合下
书接上回
数据处理功能集合上
'''
该模板用于多功能数据汇总处理:
1、用于解压压缩包,输入指定路径,即可解压多级压缩文件;
2、镜筒反射率、LAB文件汇总;
3、镜片反射率数据汇总,区分S1、S2;
4、统计Lab,L值、a值、b值的NG占比;(新增)
5、按指定字符或正则表达式分类文件;(新增)
6、增加了standered版本,可以自动判定OK和NG
7、优化了df.iloc[]左闭右开问题
8、测试了标准代入的问题
9、整理了各项功能的运行顺序
10、新增了读取多个xlsx文件,统计需要的数据后汇总到指定文件
'''
import os
import sys
import shutil
import zipfile
import pandas as pd
import xlrd
import xlwt
import csv
from xlutils.copy import copy
from openpyxl import Workbook
from openpyxl import load_workbook
from os.path import dirname
from decimal import Decimal
from openpyxl.utils.dataframe import dataframe_to_rows
# 读写2007 excel
import openpyxl
from openpyxl.styles import numbers
from openpyxl.styles import Alignment
import glob
import tkinter as tk
from tkinter import messagebox
from tkinter import simpledialog
import re
from shapely.geometry import Polygon, Point
from openpyxl.styles import PatternFill
from openpyxl.utils import get_column_letter
from datetime import datetime,timedeltadef del_old_zip(file_path):os.remove(file_path)
def decompress(file_path,root):z = zipfile.ZipFile(f"{file_path}","r")z.extractall(path=f"{root}")for names in z.namelist():if names.endswith(file_flag):z.close()return 1z.close()return 0
def start_dir_make(root,dirname):os.chdir(root)os.mkdir(dirname)return os.path.join(root,dirname)
def rem_dir_extra(root,father_dir_name):try:for item in os.listdir(os.path.join(root,father_dir_name)):if not os.path.isdir(os.path.join(root,father_dir_name,item)):continueif item == father_dir_name and len(os.listdir(os.path.join(root,father_dir_name))) == 1:os.chdir(root)os.rename(father_dir_name,father_dir_name + '-old')shutil.move(os.path.join(root,father_dir_name + '-old', item),os.path.join(root))os.rmdir(os.path.join(root,father_dir_name + '-old'))rem_dir_extra(root,item)else:rem_dir_extra(os.path.join(root,father_dir_name),item)except Exception as e:print("清除文件夹出错"+str(e))
def get_allfile_msg(file_dir):for root, dirs, files in os.walk(file_dir):return root, dirs, [file for file in files if file.endswith('.xls') or file.endswith('.xlsx') or file.endswith('.csv')]
def get_allfile_url(root, files):allFile_url = []for file_name in files:file_url = root + "/" + file_nameallFile_url.append(file_url)return allFile_url
def get_file_name(path, suffix = ['.xlsx', '.xls','.csv']): #'.xlsx', '.xls',tmp_lst = []for root,dirs,files in os.walk(path):for file in files:tmp_lst.append(os.path.join(root, file))return tmp_lst
def extract_last_part_of_path(path):return os.path.basename(path)
def read_csv_file(file_path):#参数:error_bad_lines=False跳过错误的行 delimiter=',',encoding = 'gbk',header = 0, engine='python' sep = r"\s+\s{0}" encoding = "iso-8859-1"return pd.read_csv(file_path,encoding = 'latin1',sep = r"\s+\s{0}",dtype=object,quotechar="'",delimiter=',',doublequote=True,engine="python",header = 1) #第2行作为表头
def read_csv_file1(file_path):#参数:error_bad_lines=False跳过错误的行 delimiter=',',encoding = 'gbk',header = 0, engine='python' sep = r"\s+\s{0}" encoding = "iso-8859-1"return pd.read_csv(file_path,encoding = 'latin1',sep = r"\s+\s{0}",dtype=object,quotechar="'",delimiter=',',doublequote=True,engine="python",header = None,skiprows=lambda x:x not in [2,3,4]) #第2行作为表头
def count_and_list_folders(target_dir):try:if not os.path.exists(target_dir):raise FileNotFoundError(f"目录不存在:{target_dir}")if not os.path.isdir(target_dir):raise NotADirectoryError(f"路径不是目录:{target_dir}")folders = [entry.name for entry in os.scandir(target_dir) if entry.is_dir()] return foldersexcept Exception as e:print(f"错误:{str(e)}")
def format_excel_date(cell_value):"""智能格式化Excel单元格中的日期"""# 处理空值直接返回空字符串(可根据需求改为None)if pd.isna(cell_value):return "" # 场景1:Pandas原生日期类型(Timestamp)if isinstance(cell_value, pd.Timestamp):return cell_value.strftime("%Y-%m-%d") # 场景2:数值类型(可能是Excel日期序列数)if isinstance(cell_value, (int, float)):try:# Windows Excel基准日期(1899-12-30)base_date = pd.Timestamp("1899-12-30")dt = base_date + pd.Timedelta(days=cell_value)# 验证是否为合理日期(避免将普通数字误转为日期)if dt.year > 1900: # 过滤明显不合理年份return dt.strftime("%Y-%m-%d")except:pass # 场景3:字符串类型日期if isinstance(cell_value, str):try:# 自动解析常见日期格式dt = pd.to_datetime(cell_value, dayfirst=False, yearfirst=False)return dt.strftime("%Y-%m-%d")except:pass # 场景4:其他无法识别的类型原样返回return cell_value
if __name__ == '__main__':red_fill = PatternFill(start_color='FF0000',end_color='FF0000',fill_type='solid')sort = input("请选择数据处理模式:\n 1-解压 7-镜筒,9-镜片 lab-统计lab异常占比 c-分类汇总文件 s-oppo/vivo数据 \n") #df.iloc[row,col] 为左闭右开区间if sort =="s":customer = input("请输入汇总选项:1-OPPO 2-VIVO 3-新品全流程")print("特殊件号说明:\n 1、39634A-400,镜筒39634A-701,镜片39656A \n 2、39802A-400,镜筒39731A-701,镜片39731A \n 3、39750B-400,镜筒39750B-701,镜片是39750A") #--------------------------------------读取原始数据-------------------------------------------sheet_name_film = '4月'file_path = r'D:\Users\gxcaoty\Desktop\DJD\data'file_uspm = file_path + '\\' + '反射率数据统计表4月可视化.xlsx' #反射率file_film_2 = file_path + '\\' +'手机镜片烘烤膜层牢固度粘拉记录表2.xlsx' #膜层粘拉2号楼file_film_6 = file_path + '\\' +'手机镜片烘烤膜层牢固度粘拉记录表6.xlsx' #膜层粘拉6号楼file_xn_2 = file_path + '\\' +'4月性能抽检报表.xlsm' #性能2号楼file_xn_6 = file_path + '\\' +'4月份汇总.xlsm' #性能6号楼file_wj_2 = file_path + '\\' +'4月份外观OQC日检报表.xlsx' #外观2号楼file_wj_6 = file_path + '\\' +'25年6#4月份外观OQC日检报表.xlsx' #外观6号楼file_ttl = file_path + '\\' +'OQC推脱力报表.xlsx' #推脱力file_jpwj_2 = file_path + '\\' + '5月部品OQC镜片抽检报表2#.xlsx'file_jpwj_6 = file_path + '\\' + '5月部品OQC镜片抽检报表6#.xlsx'df_uspm = pd.read_excel(file_uspm,sheet_name='测试记录',header=0,nrows=50000,usecols=['测试日期','机种','件号','判定','异常项目'])df_film_2 = pd.read_excel(file_film_2,sheet_name=sheet_name_film,header=1,nrows=1000,usecols=['检验日期','件号'])df_film_6 = pd.read_excel(file_film_6,sheet_name=sheet_name_film,header=1,nrows=1000,usecols=['检验日期','件号']) #usecols='A:Z'df_xn_2 = pd.read_excel(file_xn_2,sheet_name='MTF',header=0,nrows=5000,usecols=['日期','件号','抽检数量','转点确认不良数'])df_xn_6 = pd.read_excel(file_xn_6,sheet_name='MTF',header=0,nrows=5000,usecols=['日期','件号','抽检数量','转点确认不良数'])df_wj_2 = pd.read_excel(file_wj_2,sheet_name='明细',header=1,nrows=50000,usecols=['日期','件号','送检数','不良数'])df_wj_6 = pd.read_excel(file_wj_6,sheet_name='明细',header=1,nrows=50000,usecols=['日期','件号','送检数','不良数'])df_ttl = pd.read_excel(file_ttl,sheet_name='详细列表',header=0,nrows=50000,usecols=['测试日期','件号','平均值'])df_bz_2 = pd.read_excel(file_xn_2,sheet_name='包装跌落',header=0,nrows=5000,usecols=['日期','件号','测试数','变化量不良数'])df_bz_6 = pd.read_excel(file_xn_6,sheet_name='包装跌落',header=0,nrows=5000,usecols=['日期','件号','测试数','变化量不良数'])print("1-USPM(a) 2-膜层粘拉(b) 3-性能(c) 4-外观(d) 5-推脱力(e) 6-包装跌落(f)") #----------------------------------------OPPO输出文件------------------------------------------------------oppo_file = "二工厂oppo件号数据.xlsx"wb_oppo = load_workbook(oppo_file)ws_oppo_bp = wb_oppo['部品']ws_oppo_jt = wb_oppo["外观"]df_oppo_bp = pd.read_excel(oppo_file,sheet_name="部品",header=None)df_oppo_jt = pd.read_excel(oppo_file,sheet_name="外观",header=None)cols_oppo_bp = df_oppo_bp.iloc[0].notna().sum() #最大列数rows_oppo_bp = df_oppo_bp.shape[0]cols_oppo_jt = df_oppo_jt.iloc[0].notna().sum() #最大列数rows_oppo_jt = df_oppo_jt.shape[0]print(f"oppo部品最大行数:{rows_oppo_bp},oppo部品最大列数:{cols_oppo_bp}",f"oppo镜头最大行数:{rows_oppo_jt},oppo镜头最大列数:{cols_oppo_jt}")#----------------------------------------VIVO输出文件------------------------------------------------------vivo_file = "二工厂vivo件号数据.xlsx"wb_vivo = load_workbook(vivo_file)ws_vivo = wb_vivo['Sheet1']df_vivo = pd.read_excel(vivo_file,sheet_name="Sheet1",header=None)cols_vivo = df_vivo.iloc[0].notna().sum() #最大列数rows_vivo = df_vivo.shape[0]print(f"vivo最大行数:{rows_vivo},vivo最大列数:{cols_vivo}") #-----------------------------------------新品全流程品质管控指标输出文件--------------------------------------xp_file = "新品全流程品质管控指标终版.xlsx"wb_xp = load_workbook(xp_file)#jh = [jh for jh in G_target if jh in G_target]#filtered_sheets = [name for name in wb_xp.sheetnames if jh in name]ws_xp_853A = wb_xp['小米-39853A-A区']ws_xp_750B = wb_xp['三星-39750B-A区']df_xp_853A = pd.read_excel(xp_file,sheet_name="小米-39853A-A区",header=None)cols_xp_853A = df_xp_853A.iloc[0].notna().sum() #最大列数rows_xp_853A = df_xp_853A.shape[0]df_xp_750B = pd.read_excel(xp_file,sheet_name="三星-39750B-A区",header=None)cols_xp_750B = df_xp_750B.iloc[0].notna().sum() #最大列数rows_xp_750B = df_xp_750B.shape[0] print(f"新品最大行数:{rows_xp_853A},新品最大列数:{cols_xp_853A}") #----------------------------------------------------------------------------------------------------------- if customer == "1":print('OPPO专项数据汇总(周四/周)-----------解读中,请稍后--------------------------')print('解读内容为:sheet(部品)镜片&镜筒膜色、反射率、膜层粘拉 ; sheet(外观)性能不良率、外观不良率、推脱力平均值、包装跌落不良率')customer = "1"specified_date = ['2025-04-23','2025-04-24','2025-04-25','2025-04-26','2025-04-27']G_target = ['39634A','39368A','39580A','39731A','39802A']for date in specified_date:for target in G_target:# 关闭所有 DataFrame 的默认显示#sys.stdout = open(os.devnull, "w")pd.set_option("display.notebook_repr_html", False) # 禁用 HTML 渲染:ml-citation{ref="6" data="citationList"}pd.set_option("display.max_rows", 0) # 设置最大显示行数为 0(完全隐藏):ml-citation{ref="3,6" data="citationList"}pd.set_option('display.show_dimensions', False) # 关闭维度显示#类型1 39634A-400 39634A-701 39656A-90 #类型2 39802A-400 39731A-701 39731A-90 #类型3 39750B-400 39750B-701 39750A-90#类型2 39853A-400 39656A-701 39656A-90if target == "39634A": # 镜头是39634 ,镜筒是39634 镜片是39526target_1 ="39634A" #镜筒target_2 = "39526A" #镜片elif target == "39802A":target_1 = "39731A" #镜筒target_2 = "39731A" #镜片elif target == "39750B":target_1 = "39750B" #镜筒target_2 = "39750A" #镜片elif target == "39853A":target_1 = "39656A" #镜筒target_2 == "39656A" #镜片else :target_1 = target_2 = targetmask1 = df_uspm["机种"].astype(str).str.contains(target_1,na=False) mask1_2 = df_uspm["机种"].astype(str).str.contains(target_2,na=False)matches1 = df_uspm[mask1]matches1_2 = df_uspm[mask1_2]mask2_2 = df_film_2["件号"].astype(str).str.contains(target_2,na=False) mask2_6 = df_film_6["件号"].astype(str).str.contains(target_2,na=False) matches2_2 = df_film_2[mask2_2]matches2_6 = df_film_6[mask2_6]mask3_2 = df_xn_2["件号"].astype(str).str.contains(target,na=False) mask3_6 = df_xn_6["件号"].astype(str).str.contains(target,na=False) matches3_2 = df_xn_2[mask3_2]matches3_6 = df_xn_6[mask3_6]mask4_2 = df_wj_2["件号"].astype(str).str.contains(target,na=False) mask4_6 = df_wj_6["件号"].astype(str).str.contains(target,na=False) matches4_2 = df_wj_2[mask4_2]matches4_6 = df_wj_6[mask4_6]mask5 = df_ttl["件号"].astype(str).str.contains(target,na=False) matches5 = df_ttl[mask5]mask6_2 = df_bz_2["件号"].astype(str).str.contains(target,na=False) mask6_6 = df_bz_6["件号"].astype(str).str.contains(target,na=False) matches6_2 = df_bz_2[mask6_2]matches6_6 = df_bz_6[mask6_6]#反射率数据:if not matches1.empty or not matches1_2.empty: #not matches1.empty ordf_uspm["测试日期"] = pd.to_datetime(df_uspm["测试日期"]).dt.strftime("%Y-%m-%d") #可去除df_uspm["异常项目"]=df_uspm["异常项目"].fillna("").astype(str) #可去除 condition_70 = (df_uspm["测试日期"]==date) & (df_uspm["机种"].str.contains(target_1,case=False,na=False)) & (pd.to_numeric(df_uspm["件号"],errors="coerce") < 800) condition_90 = (df_uspm["测试日期"]==date) & (df_uspm["机种"].str.contains(target_2,case=False,na=False)) & (pd.to_numeric(df_uspm["件号"],errors="coerce") > 800)condition_count_70 = df_uspm.loc[condition_70].shape[0] #镜筒总批次 mask_a.sum()condition_count_90 = df_uspm.loc[condition_90].shape[0] #镜片总批次ok_70 = condition_70 & (df_uspm["判定"]=="OK") ok_90 = condition_90 & (df_uspm["判定"]=="OK") ok_70_count = df_uspm.loc[ok_70].shape[0] #镜筒OK批次ok_90_count = df_uspm.loc[ok_90].shape[0] #镜片OK批次lab_count_70 = df_uspm<