import pandas as pd import tkinter as tk from tkinter import filedialog from datetime import datetime import os from typing import Dict, List, Tuple, Optional class BOMComparator: """BOM文件差异对比器""" def __init__(self): self.file1_path = "" self.file2_path = "" self.file1_sheets = [] self.file2_sheets = [] self.common_sheets = [] self.differences = {} self.file1_name = "" self.file2_name = "" self.columns_to_exclude = ['检查信息', '检查状态', '校验信息'] # 要排除的列名 def select_file(self, title: str) -> str: """手动选择文件""" root = tk.Tk() root.withdraw() file_path = filedialog.askopenfilename( title=title, filetypes=[("Excel files", "*.xlsx"), ("All files", "*.*")] ) root.destroy() return file_path def find_valid_sheets(self, file_path: str) -> List[str]: """参考附件3的方式查找有效的sheet""" valid_sheets = [] try: xl_file = pd.ExcelFile(file_path) for sheet_name in xl_file.sheet_names: try: # 尝试读取sheet,检查是否包含BOM数据 df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=10) # 检查是否包含BOM相关列(参考附件结构) required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description'] found_columns = [col for col in df.columns if col in required_columns] if len(found_columns) >= 2: # 至少找到2个关键列 # 检查是否有实际数据(不只是表头) if len(df) > 1: valid_sheets.append(sheet_name) except Exception as e: continue except Exception as e: print(f"读取文件 {file_path} 时出错: {e}") return valid_sheets def get_common_sheets(self) -> List[str]: """获取两个文件的共同工作表""" if not self.file1_sheets or not self.file2_sheets: return [] # 标准化工作表名称(去除空格和特殊字符) file1_clean = [self.standardize_sheet_name(sheet) for sheet in self.file1_sheets] file2_clean = [self.standardize_sheet_name(sheet) for sheet in self.file2_sheets] # 找出共同的工作表 common_sheets = [] for sheet1 in self.file1_sheets: clean_sheet1 = self.standardize_sheet_name(sheet1) for sheet2 in self.file2_sheets: clean_sheet2 = self.standardize_sheet_name(sheet2) if clean_sheet1 == clean_sheet2: common_sheets.append(sheet1) break return common_sheets def standardize_sheet_name(self, sheet_name: str) -> str: """标准化工作表名称,便于比较""" return str(sheet_name).strip().lower().replace(' ', '_').replace('-', '_') def load_bom_data(self, file_path: str, sheet_name: str) -> pd.DataFrame: """加载BOM数据""" try: df = pd.read_excel(file_path, sheet_name=sheet_name) # 清理数据:去除空行和空列 df = df.dropna(how='all').dropna(axis=1, how='all') # 清理列名 df.columns = df.columns.str.strip() return df except Exception as e: print(f"加载sheet {sheet_name} 时出错: {e}") return pd.DataFrame() def should_compare_column(self, column_name: str) -> bool: """判断是否应该对比该列(排除检查信息类列)""" exclude_keywords = ['检查', '校验', '状态', '备注', 'comment', 'check'] column_lower = str(column_name).lower() # 检查是否在排除列表中 if column_name in self.columns_to_exclude: return False # 检查是否包含排除关键词 for keyword in exclude_keywords: if keyword in column_lower: return False return True def get_columns_to_compare(self, df1: pd.DataFrame, df2: pd.DataFrame) -> List[str]: """获取需要对比的列名(排除检查信息类列)""" common_columns = list(set(df1.columns).intersection(set(df2.columns))) # 过滤掉不需要对比的列 columns_to_compare = [col for col in common_columns if self.should_compare_column(col)] return columns_to_compare def compare_dataframes(self, df1: pd.DataFrame, df2: pd.DataFrame, sheet_name1: str, sheet_name2: str) -> Dict: """对比两个DataFrame的差异(排除检查信息类列)""" differences = { 'sheet_names': f"{sheet_name1} vs {sheet_name2}", 'added_rows': [], 'removed_rows': [], 'modified_rows': [], 'columns_comparison': {}, 'summary': { 'total_rows_df1': len(df1), 'total_rows_df2': len(df2), 'added_count': 0, 'removed_count': 0, 'modified_count': 0 }, 'original_dfs': { 'df1': df1.copy(), 'df2': df2.copy() } } # 确定关键列用于行匹配 key_columns = self.identify_key_columns(df1, df2) if not key_columns: differences['error'] = "无法确定用于对比的关键列" return differences try: # 设置索引 df1_indexed = df1.set_index(key_columns) df2_indexed = df2.set_index(key_columns) # 获取需要对比的列(排除检查信息类列) columns_to_compare = self.get_columns_to_compare(df1, df2) # 找出新增的行 new_indexes = df2_indexed.index.difference(df1_indexed.index) if len(new_indexes) > 0: differences['added_rows'] = df2_indexed.loc[new_indexes].reset_index().to_dict('records') differences['summary']['added_count'] = len(new_indexes) # 找出删除的行 removed_indexes = df1_indexed.index.difference(df2_indexed.index) if len(removed_indexes) > 0: differences['removed_rows'] = df1_indexed.loc[removed_indexes].reset_index().to_dict('records') differences['summary']['removed_count'] = len(removed_indexes) # 找出共同的行并进行详细对比(排除检查信息类列) common_indexes = df1_indexed.index.intersection(df2_indexed.index) for idx in common_indexes: row1 = df1_indexed.loc[idx] row2 = df2_indexed.loc[idx] # 检查每列的值是否相同(只对比需要比较的列) modified_cols = {} for col in columns_to_compare: if col in df1_indexed.columns and col in df2_indexed.columns: val1 = row1[col] val2 = row2[col] # 处理NaN值的比较 if pd.isna(val1) and pd.isna(val2): continue elif pd.isna(val1) or pd.isna(val2) or str(val1) != str(val2): modified_cols[col] = { 'old_value': val1, 'new_value': val2 } if modified_cols: # 获取完整的行数据以显示所有需要的列 full_row_data = self.get_full_row_data_for_display(df1, df2, idx, key_columns) differences['modified_rows'].append({ 'key_values': dict(zip(key_columns, idx)) if isinstance(idx, tuple) else {key_columns[0]: idx}, 'modified_columns': modified_cols, 'full_row_data': full_row_data }) differences['summary']['modified_count'] += 1 # 列级对比(包含所有列,用于统计) common_columns = set(df1.columns).intersection(set(df2.columns)) df1_only_columns = set(df1.columns).difference(set(df2.columns)) df2_only_columns = set(df2.columns).difference(set(df1.columns)) # 计算实际参与对比的列 compared_columns = set(columns_to_compare) excluded_columns = common_columns - compared_columns differences['columns_comparison'] = { 'common_columns': list(common_columns), 'compared_columns': list(compared_columns), 'excluded_columns': list(excluded_columns), 'file1_only_columns': list(df1_only_columns), 'file2_only_columns': list(df2_only_columns) } except Exception as e: differences['error'] = f"对比过程中出错: {str(e)}" return differences def get_full_row_data_for_display(self, df1: pd.DataFrame, df2: pd.DataFrame, idx, key_columns: List[str]) -> Dict: """获取完整的行数据用于显示""" display_data = {} # 获取两个文件中的对应行数据 row1_data = self.extract_row_data(df1, idx, key_columns) row2_data = self.extract_row_data(df2, idx, key_columns) # 定义需要显示的列(排除检查信息类列) display_columns = ['Purchase_Code', 'MF_PN', 'Description', 'Part Type', 'MF_NAME', 'PCB_Footprint', '合计'] # 过滤掉检查信息类列 display_columns = [col for col in display_columns if self.should_compare_column(col)] for col in display_columns: val1 = row1_data.get(col, '') val2 = row2_data.get(col, '') # 格式化显示:有差异显示原值->新值,无差异显示原值 if pd.isna(val1) or val1 == '': display_value = val2 elif pd.isna(val2) or val2 == '': display_value = val1 elif str(val1) != str(val2): display_value = f"{val1} -> {val2}" else: display_value = val1 display_data[col] = display_value # 添加文件来源信息 display_data['_from_file1'] = row1_data display_data['_from_file2'] = row2_data return display_data def extract_row_data(self, df: pd.DataFrame, idx, key_columns: List[str]) -> Dict: """从DataFrame中提取指定行的数据""" row_data = {} try: if isinstance(idx, tuple): # 多列索引的情况 mask = pd.Series(True, index=df.index) for i, key in enumerate(key_columns): mask = mask & (df[key] == idx[i]) if mask.any(): original_row = df[mask].iloc[0] for col in df.columns: row_data[col] = original_row[col] else: # 单列索引的情况 matching_rows = df[df[key_columns[0]] == idx] if len(matching_rows) > 0: original_row = matching_rows.iloc[0] for col in df.columns: row_data[col] = original_row[col] except Exception as e: pass return row_data def format_value_display(self, value1, value2): """格式化值的显示:有差异显示原值->新值,无差异显示原值""" if pd.isna(value1) or value1 == '': return value2 elif pd.isna(value2) or value2 == '': return value1 elif str(value1) != str(value2): return f"{value1} -> {value2}" else: return value1 def get_modified_columns_summary(self, modified_columns: Dict) -> str: """获取修改列的概要汇总""" if not modified_columns: return "无修改" modified_list = list(modified_columns.keys()) # 如果修改列数量较少,直接显示 if len(modified_list) <= 3: return ", ".join(modified_list) else: # 数量较多时显示前3个加省略号 return ", ".join(modified_list[:3]) + f"...等{len(modified_list)}列" def identify_key_columns(self, df1: pd.DataFrame, df2: pd.DataFrame) -> List[str]: """识别用于行匹配的关键列""" # 优先使用Partnumber作为关键列 potential_keys = ['Partnumber', 'Purchase_Code', 'MF_PN'] for key in potential_keys: if key in df1.columns and key in df2.columns: # 检查该列是否适合作为关键列(不应有过多重复值) df1_dup_rate = df1[key].duplicated().sum() / len(df1) df2_dup_rate = df2[key].duplicated().sum() / len(df2) if df1_dup_rate < 0.1 and df2_dup_rate < 0.1: # 允许少量重复 return [key] # 如果没有单一关键列,尝试组合 for key_combo in [['Partnumber', 'MF_PN'], ['Purchase_Code', 'MF_PN']]: if all(col in df1.columns for col in key_combo) and all(col in df2.columns for col in key_combo): return key_combo # 最后尝试使用所有找到的共同列 common_cols = list(set(df1.columns).intersection(set(df2.columns))) if common_cols: return common_cols[:2] # 最多使用前两列 return [] def generate_output_filename(self) -> str: """生成输出文件名,以两个文件的有效sheet名称开头""" if not self.file1_sheets or not self.file2_sheets: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"BOM差异报告_{timestamp}.xlsx" # 使用第一个文件第一个sheet和第二个文件第一个sheet file1_sheet_name = str(self.file1_sheets[0]) if self.file1_sheets else "File1" file2_sheet_name = str(self.file2_sheets[0]) if self.file2_sheets else "File2" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 清理sheet名称中的特殊字符 clean_sheet1 = self.clean_filename(file1_sheet_name) clean_sheet2 = self.clean_filename(file2_sheet_name) filename = f"{clean_sheet1}_vs_{clean_sheet2}_差异报告_{timestamp}.xlsx" return filename def clean_filename(self, filename: str) -> str: """清理文件名中的特殊字符""" filename = str(filename) # 移除Windows文件名中不允许的字符 invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, '_') # 移除多余的空格和特殊字符 filename = filename.replace(' ', '_') filename = filename.replace('\t', '_') filename = filename.replace('\n', '_') # 限制文件名长度 if len(filename) > 50: filename = filename[:50] return filename def clean_sheet_name(self, sheet_name: str, max_length: int = 25) -> str: """清理工作表名称,确保符合Excel工作表名称限制""" sheet_name = str(sheet_name) # 移除Excel工作表名称中不允许的字符 invalid_chars = '[]:*?/\\' for char in invalid_chars: sheet_name = sheet_name.replace(char, '_') # 限制工作表名称长度(Excel限制为31个字符) if len(sheet_name) > max_length: sheet_name = sheet_name[:max_length] return sheet_name def get_output_directory(self) -> str: """获取输出目录(第二个文件所在目录)""" return os.path.dirname(self.file2_path) def generate_difference_report(self) -> str: """生成差异报告Excel文件""" if not self.differences: return "没有发现差异" # 生成输出文件名和路径 output_filename = self.generate_output_filename() output_directory = self.get_output_directory() output_path = os.path.join(output_directory, output_filename) try: with pd.ExcelWriter(output_path, engine='openpyxl') as writer: # 创建总摘要表 summary_data = [] for diff_key, differences in self.differences.items(): if 'error' not in differences: columns_comparison = differences.get('columns_comparison', {}) excluded_count = len(columns_comparison.get('excluded_columns', [])) summary_data.append([ differences.get('sheet_names', diff_key), differences['summary']['total_rows_df1'], differences['summary']['total_rows_df2'], differences['summary']['added_count'], differences['summary']['removed_count'], differences['summary']['modified_count'], excluded_count ]) if summary_data: summary_df = pd.DataFrame(summary_data, columns=[ '工作表对比', '文件1行数', '文件2行数', '新增行数', '删除行数', '修改行数', '排除列数' ]) summary_df.to_excel(writer, sheet_name='对比摘要', index=False) # 为每个对比创建详细报告 for diff_key, differences in self.differences.items(): sheet_key = self.clean_sheet_name(diff_key.replace('vs', '_vs_')) if 'error' in differences: # 如果有错误,创建错误报告 error_df = pd.DataFrame([['错误信息', differences['error']]]) error_df.to_excel(writer, sheet_name=f"{sheet_key}_错误", index=False, header=False) continue # 汇总表 - 包含列对比的详细信息 summary_data = [] summary_data.append(["对比项", "数量"]) summary_data.append(["文件1总行数", differences['summary']['total_rows_df1']]) summary_data.append(["文件2总行数", differences['summary']['total_rows_df2']]) summary_data.append(["新增行数", differences['summary']['added_count']]) summary_data.append(["删除行数", differences['summary']['removed_count']]) summary_data.append(["修改行数", differences['summary']['modified_count']]) summary_data.append(["共同列数", len(differences['columns_comparison']['common_columns'])]) summary_data.append(["实际对比列数", len(differences['columns_comparison']['compared_columns'])]) summary_data.append(["排除列数", len(differences['columns_comparison']['excluded_columns'])]) summary_data.append(["文件1特有列", len(differences['columns_comparison']['file1_only_columns'])]) summary_data.append(["文件2特有列", len(differences['columns_comparison']['file2_only_columns'])]) # 添加排除列详情 excluded_cols = differences['columns_comparison'].get('excluded_columns', []) if excluded_cols: summary_data.append(["", ""]) summary_data.append(["排除的列", "(检查信息类列不参与对比)"]) for col in excluded_cols: summary_data.append(["", f"- {col}"]) pd.DataFrame(summary_data).to_excel( writer, sheet_name=f"{sheet_key}_汇总", index=False, header=False ) # 新增行详情 if differences['added_rows']: pd.DataFrame(differences['added_rows']).to_excel( writer, sheet_name=f"{sheet_key}_新增行", index=False ) # 删除行详情 if differences['removed_rows']: pd.DataFrame(differences['removed_rows']).to_excel( writer, sheet_name=f"{sheet_key}_删除行", index=False ) # 修改行详情 - 优化后的显示格式(排除检查信息列) if differences['modified_rows']: modified_data = [] for mod_row in differences['modified_rows']: # 创建基础记录 record = { **mod_row['key_values'], # 关键列(如Partnumber) '修改列': self.get_modified_columns_summary(mod_row['modified_columns']) } # 添加所有需要显示的列(排除检查信息类列) display_data = mod_row.get('full_row_data', {}) # 获取需要显示的列 display_columns = list(display_data.keys()) display_columns = [col for col in display_columns if not col.startswith('_') and self.should_compare_column(col)] for col in display_columns: record[col] = display_data.get(col, '') # 添加详细的修改信息(只包括参与对比的列) for col, values in mod_row['modified_columns'].items(): if self.should_compare_column(col): record[f'详细_{col}'] = f"{values['old_value']} -> {values['new_value']}" modified_data.append(record) if modified_data: modified_df = pd.DataFrame(modified_data) # 重新排列列的顺序,让重要信息在前 column_order = list(mod_row['key_values'].keys()) + ['修改列'] # 添加其他显示列 other_columns = [col for col in modified_df.columns if col not in column_order and not col.startswith('详细_')] column_order.extend(other_columns) # 添加详细修改信息列 detailed_cols = [col for col in modified_df.columns if col.startswith('详细_')] column_order.extend(detailed_cols) # 确保所有列都存在 existing_columns = [col for col in column_order if col in modified_df.columns] modified_df = modified_df[existing_columns] modified_df.to_excel( writer, sheet_name=f"{sheet_key}_修改行", index=False ) return output_path except Exception as e: print(f"生成报告时出错: {e}") return "" def run_comparison(self): """执行完整的BOM对比流程""" print("=== BOM文件差异对比工具 ===") print("注意:检查信息类列(如'检查信息')将不参与修改行对比") # 1. 选择第一份文件 print("\n步骤1: 选择第一份Excel文件") self.file1_path = self.select_file("选择第一份BOM Excel文件") if not self.file1_path: print("未选择文件,程序退出") return self.file1_name = os.path.basename(self.file1_path) # 2. 选择第二份文件 print("\n步骤2: 选择第二份Excel文件") self.file2_path = self.select_file("选择第二份BOM Excel文件") if not self.file2_path: print("未选择文件,程序退出") return self.file2_name = os.path.basename(self.file2_path) print(f"\n文件1: {self.file1_name}") print(f"文件2: {self.file2_name}") # 3. 查找有效sheet print("\n步骤3: 查找有效的工作表...") self.file1_sheets = self.find_valid_sheets(self.file1_path) self.file2_sheets = self.find_valid_sheets(self.file2_path) print(f"文件1的有效工作表: {self.file1_sheets}") print(f"文件2的有效工作表: {self.file2_sheets}") if not self.file1_sheets or not self.file2_sheets: print("至少有一个文件没有有效的工作表,无法进行对比") return # 4. 进行差异对比 print("\n步骤4: 进行差异对比...") self.differences = {} # 使用第一个文件第一个sheet和第二个文件第一个sheet进行对比 sheet1 = self.file1_sheets[0] sheet2 = self.file2_sheets[0] print(f"正在对比: {sheet1} (文件1) vs {sheet2} (文件2)") df1 = self.load_bom_data(self.file1_path, sheet1) df2 = self.load_bom_data(self.file2_path, sheet2) if df1.empty: print(f" ⚠ 文件1的工作表 {sheet1} 数据加载失败") return if df2.empty: print(f" ⚠ 文件2的工作表 {sheet2} 数据加载失败") return differences = self.compare_dataframes(df1, df2, sheet1, sheet2) comparison_key = f"{sheet1}_vs_{sheet2}" self.differences[comparison_key] = differences if 'error' in differences: print(f" ⚠ 对比过程中出错: {differences['error']}") else: columns_comparison = differences.get('columns_comparison', {}) excluded_count = len(columns_comparison.get('excluded_columns', [])) print(f" √ 完成对比:") print(f" 文件1行数: {differences['summary']['total_rows_df1']}") print(f" 文件2行数: {differences['summary']['total_rows_df2']}") print(f" 新增行数: {differences['summary']['added_count']}") print(f" 删除行数: {differences['summary']['removed_count']}") print(f" 修改行数: {differences['summary']['modified_count']}") print(f" 排除列数: {excluded_count} (检查信息类列不参与对比)") # 5. 生成差异报告 print("\n步骤5: 生成差异报告...") output_file = self.generate_difference_report() if output_file and os.path.exists(output_file): print(f"\n=== 对比完成 ===") print(f"差异报告已生成: {os.path.basename(output_file)}") # print(f"文件位置: {output_file}") print(f"输出目录: {self.get_output_directory()}") else: print("未成功生成差异报告") def main(): """主函数""" comparator = BOMComparator() comparator.run_comparison() input("\n按Enter键退出...") if __name__ == "__main__": main()