251 lines
8.6 KiB
Python
251 lines
8.6 KiB
Python
import pandas as pd
|
||
import os
|
||
import glob
|
||
import re
|
||
from datetime import datetime
|
||
import tkinter as tk
|
||
from tkinter import filedialog
|
||
from collections import defaultdict
|
||
|
||
|
||
class BOMConsolidator:
|
||
def __init__(self):
|
||
self.master_data = defaultdict(dict)
|
||
self.required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description',
|
||
'Part_Type', 'MF_NAME', 'PCB_Footprint', 'Quantity', 'Reference']
|
||
self.file_quantities = {}
|
||
self.consolidated_report = None
|
||
self.inconsistency_count = 0
|
||
self.processed_files = 0
|
||
self.processed_rows = 0
|
||
self.output_folder = ""
|
||
|
||
def find_valid_sheet(self, file_path):
|
||
"""定位包含有效BOM的Sheet"""
|
||
xl = pd.ExcelFile(file_path)
|
||
for sheet_name in xl.sheet_names:
|
||
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
|
||
for i in range(len(df)):
|
||
headers = df.iloc[i].values
|
||
if all(col in headers for col in ['Item', 'Partnumber', 'Purchase_Code', 'MF_PN']):
|
||
return sheet_name, i
|
||
return None, None
|
||
|
||
def clean_column_names(self, df):
|
||
"""清洗列名并标准化"""
|
||
df.columns = df.columns.str.strip().str.replace(r'\s+', '_', regex=True)
|
||
df.columns = df.columns.str.replace(r'[^a-zA-Z0-9_]', '', regex=True)
|
||
return df
|
||
|
||
def process_file(self, file_path):
|
||
"""处理单个BOM文件"""
|
||
filename = os.path.basename(file_path)
|
||
print(f"处理文件: {filename}...")
|
||
|
||
sheet_name, header_row = self.find_valid_sheet(file_path)
|
||
if not sheet_name:
|
||
print(f" ! 未找到有效BOM表: {filename}")
|
||
return False
|
||
|
||
df = pd.read_excel(file_path, sheet_name=sheet_name, header=header_row)
|
||
df = self.clean_column_names(df)
|
||
|
||
# 验证必要字段
|
||
missing_cols = [col for col in self.required_columns if col not in df.columns]
|
||
if missing_cols:
|
||
print(f" ! 缺少必要列: {', '.join(missing_cols)}")
|
||
return False
|
||
|
||
print(f" √ 找到有效Sheet: {sheet_name} (共{len(df)}行)")
|
||
self.file_quantities[filename] = {}
|
||
self.processed_files += 1
|
||
|
||
# 处理每行数据
|
||
for _, row in df.iterrows():
|
||
self.process_row(row, filename)
|
||
self.processed_rows += 1
|
||
|
||
return True
|
||
|
||
def process_row(self, row, filename):
|
||
"""处理单行数据"""
|
||
# 确定合并主键
|
||
key = row['Partnumber'] if pd.notna(row['Partnumber']) and row['Partnumber'] != '' else row['MF_PN']
|
||
if pd.isna(key) or key == '':
|
||
return
|
||
|
||
# 首次记录该物料
|
||
if key not in self.master_data:
|
||
self.master_data[key] = {
|
||
'Partnumber': row['Partnumber'],
|
||
'Purchase_Code': row['Purchase_Code'],
|
||
'MF_PN': row['MF_PN'],
|
||
'Description': row.get('Description', ''),
|
||
'Part_Type': row.get('Part_Type', ''),
|
||
'MF_NAME': row.get('MF_NAME', ''),
|
||
'PCB_Footprint': row.get('PCB_Footprint', ''),
|
||
'quantity_data': {}, # 存储每个文件的数量
|
||
'inconsistencies': [] # 存储不一致信息
|
||
}
|
||
|
||
# 检查字段一致性
|
||
current_data = self.master_data[key]
|
||
fields_to_check = ['Purchase_Code', 'MF_PN', 'Part_Type', 'MF_NAME', 'PCB_Footprint']
|
||
|
||
for field in fields_to_check:
|
||
# 处理字段名称差异
|
||
db_field = 'Part Type' if field == 'Part_Type' else field
|
||
|
||
current_val = str(current_data[field])
|
||
new_val = str(row.get(db_field, ''))
|
||
|
||
# 忽略空值和'nan'字符串
|
||
if new_val in ['', 'nan', 'NaN', 'NaT']:
|
||
continue
|
||
|
||
# 比较当前值和新值
|
||
if current_val != new_val:
|
||
current_data['inconsistencies'].append(
|
||
f"{field}不一致: {current_val} ≠ {new_val} (文件: {filename})"
|
||
)
|
||
|
||
# 检查Reference数量和Quantity是否匹配
|
||
ref_count = 0
|
||
if pd.notna(row['Reference']) and row['Reference'] != '':
|
||
ref_list = str(row['Reference']).split(',')
|
||
ref_count = len([ref for ref in ref_list if ref.strip() != ''])
|
||
|
||
try:
|
||
quantity = int(row['Quantity'])
|
||
if ref_count != quantity:
|
||
current_data['inconsistencies'].append(
|
||
f"Reference数量不符: {ref_count}个位置 ≠ Quantity={quantity} (文件: {filename})"
|
||
)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# 记录当前文件的数量
|
||
try:
|
||
qty_val = int(row['Quantity'])
|
||
self.file_quantities[filename][key] = qty_val
|
||
current_data['quantity_data'][filename] = qty_val
|
||
except (ValueError, TypeError):
|
||
self.file_quantities[filename][key] = 0
|
||
current_data['quantity_data'][filename] = 0
|
||
|
||
# 更新不一致计数
|
||
if current_data['inconsistencies']:
|
||
self.inconsistency_count += 1
|
||
|
||
def generate_report(self):
|
||
"""生成合并报告"""
|
||
if not self.master_data:
|
||
print("无有效数据可生成报告")
|
||
return None
|
||
|
||
print(f"\n生成合并报告,共{len(self.master_data)}种物料...")
|
||
|
||
# 准备报告数据结构
|
||
report_data = []
|
||
file_columns = sorted(self.file_quantities.keys())
|
||
|
||
for key, data in self.master_data.items():
|
||
row = {
|
||
'Partnumber': data['Partnumber'],
|
||
'Purchase_Code': data['Purchase_Code'],
|
||
'MF_PN': data['MF_PN'],
|
||
'Description': data['Description'],
|
||
'Part Type': data['Part_Type'],
|
||
'MF_NAME': data['MF_NAME'],
|
||
'PCB_Footprint': data['PCB_Footprint'],
|
||
'检查信息': '; '.join(data['inconsistencies'])
|
||
}
|
||
|
||
# 添加各文件数量
|
||
total = 0
|
||
for file in file_columns:
|
||
qty = data['quantity_data'].get(file, 0)
|
||
row[file] = qty
|
||
total += qty
|
||
row['合计'] = total
|
||
|
||
report_data.append(row)
|
||
|
||
# 创建DataFrame
|
||
self.consolidated_report = pd.DataFrame(report_data)
|
||
|
||
# 生成带时间戳的文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
output_path = os.path.join(self.output_folder, f"BOM合并报告_{timestamp}.xlsx")
|
||
|
||
# 保存报告
|
||
self.consolidated_report.to_excel(output_path, index=False)
|
||
|
||
# 返回统计信息和路径
|
||
stats = {
|
||
'output_path': output_path,
|
||
'file_count': self.processed_files,
|
||
'material_count': len(self.master_data),
|
||
'inconsistency_count': self.inconsistency_count,
|
||
'processed_rows': self.processed_rows
|
||
}
|
||
|
||
return stats
|
||
|
||
|
||
def select_folder():
|
||
"""弹出文件夹选择对话框"""
|
||
root = tk.Tk()
|
||
root.withdraw()
|
||
folder_selected = filedialog.askdirectory(title='选择BOM文件所在文件夹')
|
||
return folder_selected
|
||
|
||
|
||
def main():
|
||
# 初始化合并器
|
||
bom_processor = BOMConsolidator()
|
||
|
||
# 选择文件夹
|
||
folder_path = select_folder()
|
||
if not folder_path:
|
||
print("未选择文件夹,程序退出")
|
||
return
|
||
|
||
bom_processor.output_folder = folder_path
|
||
|
||
# 获取所有Excel文件
|
||
bom_files = glob.glob(os.path.join(folder_path, "*.xlsx"))
|
||
if not bom_files:
|
||
print("文件夹中没有Excel文件")
|
||
return
|
||
|
||
print(f"找到 {len(bom_files)} 个Excel文件,开始处理...")
|
||
|
||
# 处理文件
|
||
processed_count = 0
|
||
for file_path in bom_files:
|
||
success = bom_processor.process_file(file_path)
|
||
if success:
|
||
processed_count += 1
|
||
|
||
# 生成报告
|
||
if bom_processor.master_data:
|
||
stats = bom_processor.generate_report()
|
||
|
||
# 打印汇总信息
|
||
print("\n" + "=" * 40)
|
||
print("BOM合并完成! 汇总信息:")
|
||
print(f"处理文件夹: {folder_path}")
|
||
print(f"扫描文件数: {len(bom_files)}")
|
||
print(f"成功处理文件数: {processed_count}")
|
||
print(f"处理行数: {stats['processed_rows']}")
|
||
print(f"合并物料种类数: {stats['material_count']}")
|
||
print(f"检测到不一致条目数: {stats['inconsistency_count']}")
|
||
print(f"报告已保存至: {stats['output_path']}")
|
||
print("=" * 40)
|
||
else:
|
||
print("没有有效数据生成报告")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |