Files
PythonApp/main.py

251 lines
8.6 KiB
Python
Raw Normal View History

2026-02-02 15:19:30 +08:00
import pandas as pd
import os
import glob
import re
from datetime import datetime
import tkinter as tk
from tkinter import filedialog
from collections import defaultdict
class BOMConsolidator:
def __init__(self):
self.master_data = defaultdict(dict)
self.required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description',
'Part_Type', 'MF_NAME', 'PCB_Footprint', 'Quantity', 'Reference']
self.file_quantities = {}
self.consolidated_report = None
self.inconsistency_count = 0
self.processed_files = 0
self.processed_rows = 0
self.output_folder = ""
def find_valid_sheet(self, file_path):
"""定位包含有效BOM的Sheet"""
xl = pd.ExcelFile(file_path)
for sheet_name in xl.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
for i in range(len(df)):
headers = df.iloc[i].values
if all(col in headers for col in ['Item', 'Partnumber', 'Purchase_Code', 'MF_PN']):
return sheet_name, i
return None, None
def clean_column_names(self, df):
"""清洗列名并标准化"""
df.columns = df.columns.str.strip().str.replace(r'\s+', '_', regex=True)
df.columns = df.columns.str.replace(r'[^a-zA-Z0-9_]', '', regex=True)
return df
def process_file(self, file_path):
"""处理单个BOM文件"""
filename = os.path.basename(file_path)
print(f"处理文件: {filename}...")
sheet_name, header_row = self.find_valid_sheet(file_path)
if not sheet_name:
print(f" ! 未找到有效BOM表: {filename}")
return False
df = pd.read_excel(file_path, sheet_name=sheet_name, header=header_row)
df = self.clean_column_names(df)
# 验证必要字段
missing_cols = [col for col in self.required_columns if col not in df.columns]
if missing_cols:
print(f" ! 缺少必要列: {', '.join(missing_cols)}")
return False
print(f" √ 找到有效Sheet: {sheet_name} (共{len(df)}行)")
self.file_quantities[filename] = {}
self.processed_files += 1
# 处理每行数据
for _, row in df.iterrows():
self.process_row(row, filename)
self.processed_rows += 1
return True
def process_row(self, row, filename):
"""处理单行数据"""
# 确定合并主键
key = row['Partnumber'] if pd.notna(row['Partnumber']) and row['Partnumber'] != '' else row['MF_PN']
if pd.isna(key) or key == '':
return
# 首次记录该物料
if key not in self.master_data:
self.master_data[key] = {
'Partnumber': row['Partnumber'],
'Purchase_Code': row['Purchase_Code'],
'MF_PN': row['MF_PN'],
'Description': row.get('Description', ''),
'Part_Type': row.get('Part_Type', ''),
'MF_NAME': row.get('MF_NAME', ''),
'PCB_Footprint': row.get('PCB_Footprint', ''),
'quantity_data': {}, # 存储每个文件的数量
'inconsistencies': [] # 存储不一致信息
}
# 检查字段一致性
current_data = self.master_data[key]
fields_to_check = ['Purchase_Code', 'MF_PN', 'Part_Type', 'MF_NAME', 'PCB_Footprint']
for field in fields_to_check:
# 处理字段名称差异
db_field = 'Part Type' if field == 'Part_Type' else field
current_val = str(current_data[field])
new_val = str(row.get(db_field, ''))
# 忽略空值和'nan'字符串
if new_val in ['', 'nan', 'NaN', 'NaT']:
continue
# 比较当前值和新值
if current_val != new_val:
current_data['inconsistencies'].append(
f"{field}不一致: {current_val}{new_val} (文件: {filename})"
)
# 检查Reference数量和Quantity是否匹配
ref_count = 0
if pd.notna(row['Reference']) and row['Reference'] != '':
ref_list = str(row['Reference']).split(',')
ref_count = len([ref for ref in ref_list if ref.strip() != ''])
try:
quantity = int(row['Quantity'])
if ref_count != quantity:
current_data['inconsistencies'].append(
f"Reference数量不符: {ref_count}个位置 ≠ Quantity={quantity} (文件: {filename})"
)
except (ValueError, TypeError):
pass
# 记录当前文件的数量
try:
qty_val = int(row['Quantity'])
self.file_quantities[filename][key] = qty_val
current_data['quantity_data'][filename] = qty_val
except (ValueError, TypeError):
self.file_quantities[filename][key] = 0
current_data['quantity_data'][filename] = 0
# 更新不一致计数
if current_data['inconsistencies']:
self.inconsistency_count += 1
def generate_report(self):
"""生成合并报告"""
if not self.master_data:
print("无有效数据可生成报告")
return None
print(f"\n生成合并报告,共{len(self.master_data)}种物料...")
# 准备报告数据结构
report_data = []
file_columns = sorted(self.file_quantities.keys())
for key, data in self.master_data.items():
row = {
'Partnumber': data['Partnumber'],
'Purchase_Code': data['Purchase_Code'],
'MF_PN': data['MF_PN'],
'Description': data['Description'],
'Part Type': data['Part_Type'],
'MF_NAME': data['MF_NAME'],
'PCB_Footprint': data['PCB_Footprint'],
'检查信息': '; '.join(data['inconsistencies'])
}
# 添加各文件数量
total = 0
for file in file_columns:
qty = data['quantity_data'].get(file, 0)
row[file] = qty
total += qty
row['合计'] = total
report_data.append(row)
# 创建DataFrame
self.consolidated_report = pd.DataFrame(report_data)
# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(self.output_folder, f"BOM合并报告_{timestamp}.xlsx")
# 保存报告
self.consolidated_report.to_excel(output_path, index=False)
# 返回统计信息和路径
stats = {
'output_path': output_path,
'file_count': self.processed_files,
'material_count': len(self.master_data),
'inconsistency_count': self.inconsistency_count,
'processed_rows': self.processed_rows
}
return stats
def select_folder():
"""弹出文件夹选择对话框"""
root = tk.Tk()
root.withdraw()
folder_selected = filedialog.askdirectory(title='选择BOM文件所在文件夹')
return folder_selected
def main():
# 初始化合并器
bom_processor = BOMConsolidator()
# 选择文件夹
folder_path = select_folder()
if not folder_path:
print("未选择文件夹,程序退出")
return
bom_processor.output_folder = folder_path
# 获取所有Excel文件
bom_files = glob.glob(os.path.join(folder_path, "*.xlsx"))
if not bom_files:
print("文件夹中没有Excel文件")
return
print(f"找到 {len(bom_files)} 个Excel文件开始处理...")
# 处理文件
processed_count = 0
for file_path in bom_files:
success = bom_processor.process_file(file_path)
if success:
processed_count += 1
# 生成报告
if bom_processor.master_data:
stats = bom_processor.generate_report()
# 打印汇总信息
print("\n" + "=" * 40)
print("BOM合并完成! 汇总信息:")
print(f"处理文件夹: {folder_path}")
print(f"扫描文件数: {len(bom_files)}")
print(f"成功处理文件数: {processed_count}")
print(f"处理行数: {stats['processed_rows']}")
print(f"合并物料种类数: {stats['material_count']}")
print(f"检测到不一致条目数: {stats['inconsistency_count']}")
print(f"报告已保存至: {stats['output_path']}")
print("=" * 40)
else:
print("没有有效数据生成报告")
if __name__ == "__main__":
main()