import pandas as pd import tkinter as tk from tkinter import filedialog import os from datetime import datetime import numpy as np class DataProcessor: def __init__(self): self.data = None self.filename = None self.file_path = None self.file_dir = None self.processing_start_time = None def select_file(self): """手动选择数据文件""" print("🔍 打开文件选择对话框...") root = tk.Tk() root.withdraw() self.file_path = filedialog.askopenfilename( title="选择数据文件", filetypes=[("Excel files", "*.xlsx"), ("CSV files", "*.csv"), ("All files", "*.*")] ) if self.file_path: self.filename = os.path.basename(self.file_path) self.file_dir = os.path.dirname(self.file_path) print(f"✅ 已选择文件: {self.filename}") print(f"📁 文件所在目录: {self.file_dir}") return True else: print("❌ 未选择文件") return False def _load_data(self): """加载数据文件""" print("📥 开始加载数据文件...") try: if self.file_path.endswith('.csv'): self.data = pd.read_csv(self.file_path) print("✅ 成功加载CSV文件") elif self.file_path.endswith('.xlsx'): self.data = pd.read_excel(self.file_path) print("✅ 成功加载Excel文件") else: raise ValueError("不支持的文件格式") print(f"📊 数据文件形状: {self.data.shape}") print(f"📋 数据列名: {list(self.data.columns)[:10]}...") # 显示数据预览 print("\n📋 数据预览(前3行):") print(self.data.head(3)) # 显示列数据类型 print("\n📊 列数据类型:") for col in self.data.columns[:10]: print(f" {col}: {self.data[col].dtype}") except Exception as e: print(f"❌ 加载数据文件时出错: {e}") raise def _validate_data(self): """验证数据完整性""" print("🔍 验证数据完整性...") # 检查必要的测量列 required_measure_columns = ['PAD ID', 'Component ID', 'Height(mil)', 'Volume(%)', 'Area(%)'] missing_measure_columns = [col for col in required_measure_columns if col not in self.data.columns] if missing_measure_columns: error_msg = f"❌ 数据文件中缺少必要的测量列: {missing_measure_columns}" print(error_msg) raise ValueError(error_msg) # 检查上下限列 required_limit_columns = ['Height_Low(mil)', 'Height_High(mil)', 'Vol_Min(%)', 'Vol_Max(%)', 'Area_Min(%)', 'Area_Max(%)'] missing_limit_columns = [col for col in required_limit_columns if col not in self.data.columns] if missing_limit_columns: error_msg = f"❌ 数据文件中缺少必要的上下限列: {missing_limit_columns}" print(error_msg) raise ValueError(error_msg) print("✅ 数据验证通过") # 检查数据是否存在空值 all_columns = required_measure_columns + required_limit_columns null_counts = self.data[all_columns].isnull().sum() if null_counts.any(): print(f"⚠️ 数据中存在空值:") for col, count in null_counts[null_counts > 0].items(): print(f" {col}: {count} 个空值") else: print("✅ 所有必需列都没有空值") # 显示数据统计信息 print("\n📊 数据统计信息:") for col in required_measure_columns: if col in self.data.columns: # 检查列的数据类型,针对不同类型使用不同的格式化方式 if pd.api.types.is_numeric_dtype(self.data[col]): valid_count = self.data[col].count() if valid_count > 0: min_val = self.data[col].min() max_val = self.data[col].max() print(f" {col}: {valid_count} 个有效值, 范围 {min_val:.4f} - {max_val:.4f}") else: print(f" {col}: 0 个有效值") else: # 非数值型列:显示唯一值和示例 unique_count = self.data[col].nunique() sample_values = self.data[col].dropna().head(3).tolist() print( f" {col}: {self.data[col].count()} 个有效值, {unique_count} 个唯一值, 示例: {sample_values}") # 检查并转换数据类型 print("\n🔄 数据类型检查与转换:") numeric_columns = ['Height(mil)', 'Volume(%)', 'Area(%)', 'Height_Low(mil)', 'Height_High(mil)', 'Vol_Min(%)', 'Vol_Max(%)', 'Area_Min(%)', 'Area_Max(%)'] for col in numeric_columns: if col in self.data.columns: if not pd.api.types.is_numeric_dtype(self.data[col]): try: # 尝试转换为数值类型 original_count = self.data[col].count() self.data[col] = pd.to_numeric(self.data[col], errors='coerce') converted_count = self.data[col].count() lost_data = original_count - converted_count if lost_data > 0: print(f" ⚠️ {col}: 转换后丢失 {lost_data} 个非数值数据") else: print(f" ✅ {col}: 成功转换为数值类型") except Exception as e: print(f" ❌ {col}: 类型转换失败 - {e}") else: valid_count = self.data[col].count() print(f" ✅ {col}: 已经是数值类型, {valid_count} 个有效值") def _print_progress(self, message, level=1): """打印进度信息,支持分级显示""" indent = " " * level timestamp = datetime.now().strftime("%H:%M:%S") print(f"{timestamp} {indent}{message}") def generate_report(self): """生成统计报告""" if self.data is None: raise ValueError("请先选择数据文件") try: self.processing_start_time = datetime.now() print(f"\n🚀 开始生成报告 - {self.processing_start_time.strftime('%Y-%m-%d %H:%M:%S')}") # 验证数据 self._validate_data() self._print_progress("开始数据处理...", 1) # 创建分组键 self._print_progress("创建分组键...", 2) # 确保PAD ID和Component ID都是字符串类型 self.data['PAD ID'] = self.data['PAD ID'].astype(str) self.data['Component ID'] = self.data['Component ID'].astype(str) self.data['Group_Key'] = self.data['PAD ID'] + '_' + self.data['Component ID'] group_count = self.data['Group_Key'].nunique() self._print_progress(f"共发现 {group_count} 个分组", 2) # 显示分组信息 group_info = self.data['Group_Key'].value_counts() self._print_progress(f"分组数据量统计:", 2) for i, (group, count) in enumerate(group_info.head(5).items()): self._print_progress(f" {group}: {count} 个数据点", 3) if len(group_info) > 5: self._print_progress(f" ... 还有 {len(group_info) - 5} 个分组", 3) # 检查数值列是否存在NaN值 numeric_columns = ['Height(mil)', 'Volume(%)', 'Area(%)'] for col in numeric_columns: if col in self.data.columns: nan_count = self.data[col].isna().sum() if nan_count > 0: self._print_progress(f"⚠️ {col} 有 {nan_count} 个空值,将在统计计算中排除", 3) # 计算统计信息 self._print_progress("计算基本统计信息...", 2) # 确保数值列没有无穷大值 for col in numeric_columns: if col in self.data.columns: inf_count = np.isinf(self.data[col]).sum() if inf_count > 0: self._print_progress(f"⚠️ {col} 有 {inf_count} 个无穷大值,将替换为NaN", 3) self.data[col] = self.data[col].replace([np.inf, -np.inf], np.nan) stats = self.data.groupby('Group_Key').agg({ 'Height(mil)': ['min', 'max', 'mean', 'std'], 'Volume(%)': ['min', 'max', 'mean', 'std'], 'Area(%)': ['min', 'max', 'mean', 'std'] }).round(4) # 重命名列 stats.columns = [ 'Height_Measured_Min(mil)', 'Height_Measured_Max(mil)', 'Height_Mean(mil)', 'Height_Std(mil)', 'Volume_Measured_Min(%)', 'Volume_Measured_Max(%)', 'Volume_Mean(%)', 'Volume_Std(%)', 'Area_Measured_Min(%)', 'Area_Measured_Max(%)', 'Area_Mean(%)', 'Area_Std(%)' ] self._print_progress("基本统计信息计算完成", 2) # 获取上下限信息 self._print_progress("获取预设上下限信息...", 2) limits = self.data.groupby('Group_Key').agg({ 'Height_Low(mil)': 'first', 'Height_High(mil)': 'first', 'Vol_Min(%)': 'first', 'Vol_Max(%)': 'first', 'Area_Min(%)': 'first', 'Area_Max(%)': 'first' }).round(4) # 合并统计信息和上下限信息 stats = pd.concat([stats, limits], axis=1) self._print_progress("上下限信息获取完成", 2) # 计算CPK self._print_progress("开始计算CPK值...", 2) stats = self._calculate_cpk(stats) # 分析CPK结果 cpk_analysis = self._analyze_cpk_results(stats) self._print_progress("CPK分析完成", 2) self._print_cpk_summary(cpk_analysis) # 生成HTML报告 self._print_progress("生成HTML报告...", 2) report_path = self._create_html_report(stats, cpk_analysis) self._print_progress("HTML报告生成完成", 2) # 计算处理时间 processing_time = datetime.now() - self.processing_start_time self._print_progress(f"总处理时间: {processing_time.total_seconds():.2f} 秒", 1) return report_path except Exception as e: print(f"❌ 生成报告过程中出错: {e}") import traceback print(f"详细错误信息:") traceback.print_exc() raise def _analyze_cpk_results(self, stats): """分析CPK结果""" cpk_analysis = { 'total_groups': len(stats), 'cpk_status': {'Height': {}, 'Volume': {}, 'Area': {}}, 'problematic_groups': [] } for feature in ['Height', 'Volume', 'Area']: cpk_col = f'{feature}_Cpk' if cpk_col not in stats.columns: continue valid_cpk = stats[cpk_col].dropna() total_valid = len(valid_cpk) cpk_analysis['cpk_status'][feature] = { 'total': total_valid, 'excellent': len(valid_cpk[valid_cpk >= 1.33]) if total_valid > 0 else 0, 'acceptable': len(valid_cpk[(valid_cpk >= 1.0) & (valid_cpk < 1.33)]) if total_valid > 0 else 0, 'poor': len(valid_cpk[valid_cpk < 1.0]) if total_valid > 0 else 0, 'invalid': len(stats) - total_valid } # 识别有问题的分组(任意特征的CPK < 1.0) for group_key, row in stats.iterrows(): problems = [] for feature in ['Height', 'Volume', 'Area']: cpk_col = f'{feature}_Cpk' if cpk_col in stats.columns and not pd.isna(row[cpk_col]): if row[cpk_col] < 1.0: problems.append(f"{feature}: {row[cpk_col]:.4f}") if problems: cpk_analysis['problematic_groups'].append({ 'group_key': group_key, 'problems': problems }) return cpk_analysis def _print_cpk_summary(self, cpk_analysis): """打印CPK结果摘要""" print("\n📈 CPK分析结果摘要:") print("=" * 60) for feature, status in cpk_analysis['cpk_status'].items(): total = status['total'] if total == 0: print(f"\n{feature}: 无有效CPK数据") continue print(f"\n{feature}:") excellent_pct = (status['excellent'] / total * 100) if total > 0 else 0 acceptable_pct = (status['acceptable'] / total * 100) if total > 0 else 0 poor_pct = (status['poor'] / total * 100) if total > 0 else 0 print(f" ✅ 优秀 (CPK ≥ 1.33): {status['excellent']}/{total} ({excellent_pct:.1f}%)") print(f" ⚠️ 合格 (1.0 ≤ CPK < 1.33): {status['acceptable']}/{total} ({acceptable_pct:.1f}%)") print(f" ❌ 不合格 (CPK < 1.0): {status['poor']}/{total} ({poor_pct:.1f}%)") print(f" ❓ 无法计算: {status['invalid']}") if cpk_analysis['problematic_groups']: print(f"\n⚠️ 发现 {len(cpk_analysis['problematic_groups'])} 个有问题分组:") for i, group in enumerate(cpk_analysis['problematic_groups'][:10]): print(f" {i + 1}. {group['group_key']}: {', '.join(group['problems'])}") if len(cpk_analysis['problematic_groups']) > 10: print(f" ... 还有 {len(cpk_analysis['problematic_groups']) - 10} 个问题分组") else: print("\n✅ 所有分组的CPK都在合格范围内") print("=" * 60) def _calculate_cpk(self, stats): """计算CPK值""" self._print_progress("详细计算CPK值...", 3) def calculate_single_cpk(mean, std, usl, lsl): """计算单个特征的CPK""" if pd.isna(mean) or pd.isna(std) or std == 0: return np.nan if pd.isna(usl) or pd.isna(lsl): return np.nan try: cpu = (usl - mean) / (3 * std) if usl != float('inf') else float('inf') cpl = (mean - lsl) / (3 * std) if lsl != float('-inf') else float('inf') if cpu == float('inf') and cpl == float('inf'): return np.nan elif cpu == float('inf'): return cpl elif cpl == float('inf'): return cpu else: return min(cpu, cpl) except (ZeroDivisionError, TypeError): return np.nan # 计算每个特征的CPK cpk_results = [] total_groups = len(stats) for idx, row in stats.iterrows(): if len(cpk_results) % 100 == 0 and total_groups > 100: self._print_progress(f"计算第 {len(cpk_results) + 1} 个分组的CPK...", 4) # Height CPK height_cpk = calculate_single_cpk( row.get('Height_Mean(mil)', np.nan), row.get('Height_Std(mil)', np.nan), row.get('Height_High(mil)', np.nan), row.get('Height_Low(mil)', np.nan) ) # Volume CPK volume_cpk = calculate_single_cpk( row.get('Volume_Mean(%)', np.nan), row.get('Volume_Std(%)', np.nan), row.get('Vol_Max(%)', np.nan), row.get('Vol_Min(%)', np.nan) ) # Area CPK area_cpk = calculate_single_cpk( row.get('Area_Mean(%)', np.nan), row.get('Area_Std(%)', np.nan), row.get('Area_Max(%)', np.nan), row.get('Area_Min(%)', np.nan) ) cpk_results.append({ 'Height_Cpk': round(height_cpk, 4) if not pd.isna(height_cpk) else np.nan, 'Volume_Cpk': round(volume_cpk, 4) if not pd.isna(volume_cpk) else np.nan, 'Area_Cpk': round(area_cpk, 4) if not pd.isna(area_cpk) else np.nan }) # 将CPK结果添加到统计数据中 cpk_df = pd.DataFrame(cpk_results, index=stats.index) stats = pd.concat([stats, cpk_df], axis=1) self._print_progress(f"所有 {len(stats)} 个分组CPK计算完成", 3) return stats def _get_cpk_status_class(self, cpk_value): """根据CPK值返回状态类别""" if pd.isna(cpk_value): return 'cpk-invalid' elif cpk_value >= 1.33: return 'cpk-excellent' elif cpk_value >= 1.0: return 'cpk-acceptable' else: return 'cpk-poor' def _create_html_report(self, stats, cpk_analysis): """创建完整的HTML报告""" self._print_progress("构建HTML报告内容...", 3) total_groups = len(stats) # 完整的HTML模板 html_content = f""" 数据统计报告 - {self.filename}

📊 数据统计报告 - {self.filename}

生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

输入文件: {self.filename}

📈 报告摘要

总分组数量: {total_groups}

处理时间: {(datetime.now() - self.processing_start_time).total_seconds():.2f} 秒

""" # 添加CPK状态卡片 for feature, status in cpk_analysis['cpk_status'].items(): total = status['total'] + status['invalid'] if total == 0: continue html_content += f"""

{feature} CPK状态

{status['excellent'] + status['acceptable']}/{total}

合格率: {(status['excellent'] + status['acceptable']) / total * 100:.1f}%

优秀: {status['excellent']} 合格: {status['acceptable']} 不合格: {status['poor']} 无效: {status['invalid']}
""" html_content += f"""
{f'

⚠️ 发现 {len(cpk_analysis["problematic_groups"])} 个问题分组

以下分组的CPK值低于1.0,需要重点关注

' if cpk_analysis['problematic_groups'] else ''}

📋 详细统计数据

预设上下限 实测值 CPK ≥ 1.33 1.0 ≤ CPK < 1.33 CPK < 1.0
""" # 生成表格行数据的辅助函数 def format_value(value): if pd.isna(value): return 'N/A' elif isinstance(value, (int, float)): return f"{value:.4f}" else: return str(value) # 用于检查列是否存在的辅助函数 def safe_get_value(row, column_name): """安全获取列值,如果列不存在返回N/A""" if column_name in row.index: return row[column_name] else: return np.nan for group_key, row in stats.iterrows(): # 检查是否为问题分组 is_problematic = any(problem['group_key'] == group_key for problem in cpk_analysis['problematic_groups']) row_class = 'class="problematic-row"' if is_problematic else '' html_content += f""" """ # 为每个特征生成列 for feature in ['Height', 'Volume', 'Area']: cpk_value = safe_get_value(row, f'{feature}_Cpk') cpk_class = self._get_cpk_status_class(cpk_value) # 为不同特征设置正确的列名 if feature == 'Height': lower_limit_col = 'Height_Low(mil)' upper_limit_col = 'Height_High(mil)' measured_min_col = 'Height_Measured_Min(mil)' measured_max_col = 'Height_Measured_Max(mil)' mean_col = 'Height_Mean(mil)' std_col = 'Height_Std(mil)' else: lower_limit_col = f"{'Vol' if feature == 'Volume' else 'Area'}_Min(%)" # 修正:Volume使用Vol_Min(%),Area使用Area_Min(%) upper_limit_col = f"{'Vol' if feature == 'Volume' else 'Area'}_Max(%)" # 修正:Volume使用Vol_Max(%),Area使用Area_Max(%) measured_min_col = f'{feature}_Measured_Min(%)' measured_max_col = f'{feature}_Measured_Max(%)' mean_col = f'{feature}_Mean(%)' std_col = f'{feature}_Std(%)' html_content += f""" """ html_content += """ """ html_content += """
分组标识 Height(mil) Volume(%) Area(%)
预设下限 预设上限 实测最小值 实测最大值 平均值 标准差 CPK 预设下限 预设上限 实测最小值 实测最大值 平均值 标准差 CPK 预设下限 预设上限 实测最小值 实测最大值 平均值 标准差 CPK
{group_key}{' ⚠️' if is_problematic else ''} {format_value(safe_get_value(row, lower_limit_col))} {format_value(safe_get_value(row, upper_limit_col))} {format_value(safe_get_value(row, measured_min_col))} {format_value(safe_get_value(row, measured_max_col))} {format_value(safe_get_value(row, mean_col))} {format_value(safe_get_value(row, std_col))} {format_value(cpk_value)}

📊 CPK状态分布

""" # 添加简单的CPK分布图表 for feature, status in cpk_analysis['cpk_status'].items(): total = status['total'] + status['invalid'] if total == 0: continue html_content += f"""

{feature} CPK分布

优秀 {status['excellent']} | 合格 {status['acceptable']} | 不合格 {status['poor']} | 无效 {status['invalid']}
""" html_content += """
""" # 保存报告 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') report_filename = f"{os.path.splitext(self.filename)[0]}_report_{timestamp}.html" report_path = os.path.join(self.file_dir, report_filename) self._print_progress(f"保存报告到: {report_path}", 3) with open(report_path, 'w', encoding='utf-8') as f: f.write(html_content) return report_path def main(): """主函数""" print("=" * 60) print("🚀 数据统计报告生成程序 - Volume上下限修复版") print("=" * 60) processor = DataProcessor() try: if processor.select_file(): processor._load_data() report_path = processor.generate_report() print("\n" + "=" * 60) print("✅ 程序执行完成") print(f"📄 统计报告生成成功: {report_path}") print("=" * 60) else: print("❌ 未选择文件,程序退出") except Exception as e: print(f"\n❌ 程序执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()