import pandas as pd import os import glob import re from datetime import datetime import tkinter as tk from tkinter import filedialog from collections import defaultdict class BOMConsolidator: def __init__(self): self.master_data = defaultdict(dict) self.required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description', 'Part_Type', 'MF_NAME', 'PCB_Footprint', 'Quantity', 'Reference'] self.file_quantities = {} self.consolidated_report = None self.inconsistency_count = 0 self.processed_files = 0 self.processed_rows = 0 self.output_folder = "" def find_valid_sheet(self, file_path): """定位包含有效BOM的Sheet""" xl = pd.ExcelFile(file_path) for sheet_name in xl.sheet_names: df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) for i in range(len(df)): headers = df.iloc[i].values if all(col in headers for col in ['Item', 'Partnumber', 'Purchase_Code', 'MF_PN']): return sheet_name, i return None, None def clean_column_names(self, df): """清洗列名并标准化""" df.columns = df.columns.str.strip().str.replace(r'\s+', '_', regex=True) df.columns = df.columns.str.replace(r'[^a-zA-Z0-9_]', '', regex=True) return df def process_file(self, file_path): """处理单个BOM文件""" filename = os.path.basename(file_path) print(f"处理文件: {filename}...") sheet_name, header_row = self.find_valid_sheet(file_path) if not sheet_name: print(f" ! 未找到有效BOM表: {filename}") return False df = pd.read_excel(file_path, sheet_name=sheet_name, header=header_row) df = self.clean_column_names(df) # 验证必要字段 missing_cols = [col for col in self.required_columns if col not in df.columns] if missing_cols: print(f" ! 缺少必要列: {', '.join(missing_cols)}") return False print(f" √ 找到有效Sheet: {sheet_name} (共{len(df)}行)") self.file_quantities[filename] = {} self.processed_files += 1 # 处理每行数据 for _, row in df.iterrows(): self.process_row(row, filename) self.processed_rows += 1 return True def process_row(self, row, filename): """处理单行数据""" # 确定合并主键 key = row['Partnumber'] if pd.notna(row['Partnumber']) and row['Partnumber'] != '' else row['MF_PN'] if pd.isna(key) or key == '': return # 首次记录该物料 if key not in self.master_data: self.master_data[key] = { 'Partnumber': row['Partnumber'], 'Purchase_Code': row['Purchase_Code'], 'MF_PN': row['MF_PN'], 'Description': row.get('Description', ''), 'Part_Type': row.get('Part_Type', ''), 'MF_NAME': row.get('MF_NAME', ''), 'PCB_Footprint': row.get('PCB_Footprint', ''), 'quantity_data': {}, # 存储每个文件的数量 'inconsistencies': [] # 存储不一致信息 } # 检查字段一致性 current_data = self.master_data[key] fields_to_check = ['Purchase_Code', 'MF_PN', 'Part_Type', 'MF_NAME', 'PCB_Footprint'] for field in fields_to_check: # 处理字段名称差异 db_field = 'Part Type' if field == 'Part_Type' else field current_val = str(current_data[field]) new_val = str(row.get(db_field, '')) # 忽略空值和'nan'字符串 if new_val in ['', 'nan', 'NaN', 'NaT']: continue # 比较当前值和新值 if current_val != new_val: current_data['inconsistencies'].append( f"{field}不一致: {current_val} ≠ {new_val} (文件: {filename})" ) # 检查Reference数量和Quantity是否匹配 ref_count = 0 if pd.notna(row['Reference']) and row['Reference'] != '': ref_list = str(row['Reference']).split(',') ref_count = len([ref for ref in ref_list if ref.strip() != '']) try: quantity = int(row['Quantity']) if ref_count != quantity: current_data['inconsistencies'].append( f"Reference数量不符: {ref_count}个位置 ≠ Quantity={quantity} (文件: {filename})" ) except (ValueError, TypeError): pass # 记录当前文件的数量 try: qty_val = int(row['Quantity']) self.file_quantities[filename][key] = qty_val current_data['quantity_data'][filename] = qty_val except (ValueError, TypeError): self.file_quantities[filename][key] = 0 current_data['quantity_data'][filename] = 0 # 更新不一致计数 if current_data['inconsistencies']: self.inconsistency_count += 1 def generate_report(self): """生成合并报告""" if not self.master_data: print("无有效数据可生成报告") return None print(f"\n生成合并报告,共{len(self.master_data)}种物料...") # 准备报告数据结构 report_data = [] file_columns = sorted(self.file_quantities.keys()) for key, data in self.master_data.items(): row = { 'Partnumber': data['Partnumber'], 'Purchase_Code': data['Purchase_Code'], 'MF_PN': data['MF_PN'], 'Description': data['Description'], 'Part Type': data['Part_Type'], 'MF_NAME': data['MF_NAME'], 'PCB_Footprint': data['PCB_Footprint'], '检查信息': '; '.join(data['inconsistencies']) } # 添加各文件数量 total = 0 for file in file_columns: qty = data['quantity_data'].get(file, 0) row[file] = qty total += qty row['合计'] = total report_data.append(row) # 创建DataFrame self.consolidated_report = pd.DataFrame(report_data) # 生成带时间戳的文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = os.path.join(self.output_folder, f"BOM合并报告_{timestamp}.xlsx") # 保存报告 self.consolidated_report.to_excel(output_path, index=False) # 返回统计信息和路径 stats = { 'output_path': output_path, 'file_count': self.processed_files, 'material_count': len(self.master_data), 'inconsistency_count': self.inconsistency_count, 'processed_rows': self.processed_rows } return stats def select_folder(): """弹出文件夹选择对话框""" root = tk.Tk() root.withdraw() folder_selected = filedialog.askdirectory(title='选择BOM文件所在文件夹') return folder_selected def main(): # 初始化合并器 bom_processor = BOMConsolidator() # 选择文件夹 folder_path = select_folder() if not folder_path: print("未选择文件夹,程序退出") return bom_processor.output_folder = folder_path # 获取所有Excel文件 bom_files = glob.glob(os.path.join(folder_path, "*.xlsx")) if not bom_files: print("文件夹中没有Excel文件") return print(f"找到 {len(bom_files)} 个Excel文件,开始处理...") # 处理文件 processed_count = 0 for file_path in bom_files: success = bom_processor.process_file(file_path) if success: processed_count += 1 # 生成报告 if bom_processor.master_data: stats = bom_processor.generate_report() # 打印汇总信息 print("\n" + "=" * 40) print("BOM合并完成! 汇总信息:") print(f"处理文件夹: {folder_path}") print(f"扫描文件数: {len(bom_files)}") print(f"成功处理文件数: {processed_count}") print(f"处理行数: {stats['processed_rows']}") print(f"合并物料种类数: {stats['material_count']}") print(f"检测到不一致条目数: {stats['inconsistency_count']}") print(f"报告已保存至: {stats['output_path']}") print("=" * 40) else: print("没有有效数据生成报告") if __name__ == "__main__": main()