Files
PythonApp/main.py

251 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import os
import glob
import re
from datetime import datetime
import tkinter as tk
from tkinter import filedialog
from collections import defaultdict
class BOMConsolidator:
def __init__(self):
self.master_data = defaultdict(dict)
self.required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description',
'Part_Type', 'MF_NAME', 'PCB_Footprint', 'Quantity', 'Reference']
self.file_quantities = {}
self.consolidated_report = None
self.inconsistency_count = 0
self.processed_files = 0
self.processed_rows = 0
self.output_folder = ""
def find_valid_sheet(self, file_path):
"""定位包含有效BOM的Sheet"""
xl = pd.ExcelFile(file_path)
for sheet_name in xl.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
for i in range(len(df)):
headers = df.iloc[i].values
if all(col in headers for col in ['Item', 'Partnumber', 'Purchase_Code', 'MF_PN']):
return sheet_name, i
return None, None
def clean_column_names(self, df):
"""清洗列名并标准化"""
df.columns = df.columns.str.strip().str.replace(r'\s+', '_', regex=True)
df.columns = df.columns.str.replace(r'[^a-zA-Z0-9_]', '', regex=True)
return df
def process_file(self, file_path):
"""处理单个BOM文件"""
filename = os.path.basename(file_path)
print(f"处理文件: {filename}...")
sheet_name, header_row = self.find_valid_sheet(file_path)
if not sheet_name:
print(f" ! 未找到有效BOM表: {filename}")
return False
df = pd.read_excel(file_path, sheet_name=sheet_name, header=header_row)
df = self.clean_column_names(df)
# 验证必要字段
missing_cols = [col for col in self.required_columns if col not in df.columns]
if missing_cols:
print(f" ! 缺少必要列: {', '.join(missing_cols)}")
return False
print(f" √ 找到有效Sheet: {sheet_name} (共{len(df)}行)")
self.file_quantities[filename] = {}
self.processed_files += 1
# 处理每行数据
for _, row in df.iterrows():
self.process_row(row, filename)
self.processed_rows += 1
return True
def process_row(self, row, filename):
"""处理单行数据"""
# 确定合并主键
key = row['Partnumber'] if pd.notna(row['Partnumber']) and row['Partnumber'] != '' else row['MF_PN']
if pd.isna(key) or key == '':
return
# 首次记录该物料
if key not in self.master_data:
self.master_data[key] = {
'Partnumber': row['Partnumber'],
'Purchase_Code': row['Purchase_Code'],
'MF_PN': row['MF_PN'],
'Description': row.get('Description', ''),
'Part_Type': row.get('Part_Type', ''),
'MF_NAME': row.get('MF_NAME', ''),
'PCB_Footprint': row.get('PCB_Footprint', ''),
'quantity_data': {}, # 存储每个文件的数量
'inconsistencies': [] # 存储不一致信息
}
# 检查字段一致性
current_data = self.master_data[key]
fields_to_check = ['Purchase_Code', 'MF_PN', 'Part_Type', 'MF_NAME', 'PCB_Footprint']
for field in fields_to_check:
# 处理字段名称差异
db_field = 'Part Type' if field == 'Part_Type' else field
current_val = str(current_data[field])
new_val = str(row.get(db_field, ''))
# 忽略空值和'nan'字符串
if new_val in ['', 'nan', 'NaN', 'NaT']:
continue
# 比较当前值和新值
if current_val != new_val:
current_data['inconsistencies'].append(
f"{field}不一致: {current_val}{new_val} (文件: {filename})"
)
# 检查Reference数量和Quantity是否匹配
ref_count = 0
if pd.notna(row['Reference']) and row['Reference'] != '':
ref_list = str(row['Reference']).split(',')
ref_count = len([ref for ref in ref_list if ref.strip() != ''])
try:
quantity = int(row['Quantity'])
if ref_count != quantity:
current_data['inconsistencies'].append(
f"Reference数量不符: {ref_count}个位置 ≠ Quantity={quantity} (文件: {filename})"
)
except (ValueError, TypeError):
pass
# 记录当前文件的数量
try:
qty_val = int(row['Quantity'])
self.file_quantities[filename][key] = qty_val
current_data['quantity_data'][filename] = qty_val
except (ValueError, TypeError):
self.file_quantities[filename][key] = 0
current_data['quantity_data'][filename] = 0
# 更新不一致计数
if current_data['inconsistencies']:
self.inconsistency_count += 1
def generate_report(self):
"""生成合并报告"""
if not self.master_data:
print("无有效数据可生成报告")
return None
print(f"\n生成合并报告,共{len(self.master_data)}种物料...")
# 准备报告数据结构
report_data = []
file_columns = sorted(self.file_quantities.keys())
for key, data in self.master_data.items():
row = {
'Partnumber': data['Partnumber'],
'Purchase_Code': data['Purchase_Code'],
'MF_PN': data['MF_PN'],
'Description': data['Description'],
'Part Type': data['Part_Type'],
'MF_NAME': data['MF_NAME'],
'PCB_Footprint': data['PCB_Footprint'],
'检查信息': '; '.join(data['inconsistencies'])
}
# 添加各文件数量
total = 0
for file in file_columns:
qty = data['quantity_data'].get(file, 0)
row[file] = qty
total += qty
row['合计'] = total
report_data.append(row)
# 创建DataFrame
self.consolidated_report = pd.DataFrame(report_data)
# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(self.output_folder, f"BOM合并报告_{timestamp}.xlsx")
# 保存报告
self.consolidated_report.to_excel(output_path, index=False)
# 返回统计信息和路径
stats = {
'output_path': output_path,
'file_count': self.processed_files,
'material_count': len(self.master_data),
'inconsistency_count': self.inconsistency_count,
'processed_rows': self.processed_rows
}
return stats
def select_folder():
"""弹出文件夹选择对话框"""
root = tk.Tk()
root.withdraw()
folder_selected = filedialog.askdirectory(title='选择BOM文件所在文件夹')
return folder_selected
def main():
# 初始化合并器
bom_processor = BOMConsolidator()
# 选择文件夹
folder_path = select_folder()
if not folder_path:
print("未选择文件夹,程序退出")
return
bom_processor.output_folder = folder_path
# 获取所有Excel文件
bom_files = glob.glob(os.path.join(folder_path, "*.xlsx"))
if not bom_files:
print("文件夹中没有Excel文件")
return
print(f"找到 {len(bom_files)} 个Excel文件开始处理...")
# 处理文件
processed_count = 0
for file_path in bom_files:
success = bom_processor.process_file(file_path)
if success:
processed_count += 1
# 生成报告
if bom_processor.master_data:
stats = bom_processor.generate_report()
# 打印汇总信息
print("\n" + "=" * 40)
print("BOM合并完成! 汇总信息:")
print(f"处理文件夹: {folder_path}")
print(f"扫描文件数: {len(bom_files)}")
print(f"成功处理文件数: {processed_count}")
print(f"处理行数: {stats['processed_rows']}")
print(f"合并物料种类数: {stats['material_count']}")
print(f"检测到不一致条目数: {stats['inconsistency_count']}")
print(f"报告已保存至: {stats['output_path']}")
print("=" * 40)
else:
print("没有有效数据生成报告")
if __name__ == "__main__":
main()