Python脚本开发文件初始化

This commit is contained in:
2026-02-02 15:19:30 +08:00
parent 86c4718368
commit 5c846eae94
25 changed files with 8746 additions and 0 deletions

16
BOMCompare/.gitignore vendored Normal file
View File

@@ -0,0 +1,16 @@
/build/*
/build
/dist/*
/dist
/source/*
/source
BOMCompare for Merge V2.py
BOMCompareForJP2.py
BOMConsolidator.py
BOMConsolidatorV2.py
# BOMConsolidator.py

View File

@@ -0,0 +1,655 @@
import pandas as pd
import tkinter as tk
from tkinter import filedialog
from datetime import datetime
import os
from typing import Dict, List, Tuple, Optional
class BOMComparator:
"""BOM文件差异对比器"""
def __init__(self):
self.file1_path = ""
self.file2_path = ""
self.file1_sheets = []
self.file2_sheets = []
self.common_sheets = []
self.differences = {}
self.file1_name = ""
self.file2_name = ""
self.columns_to_exclude = ['检查信息', '检查状态', '校验信息'] # 要排除的列名
def select_file(self, title: str) -> str:
"""手动选择文件"""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title=title,
filetypes=[("Excel files", "*.xlsx"), ("All files", "*.*")]
)
root.destroy()
return file_path
def find_valid_sheets(self, file_path: str) -> List[str]:
"""参考附件3的方式查找有效的sheet"""
valid_sheets = []
try:
xl_file = pd.ExcelFile(file_path)
for sheet_name in xl_file.sheet_names:
try:
# 尝试读取sheet检查是否包含BOM数据
df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=10)
# 检查是否包含BOM相关列参考附件结构
required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description']
found_columns = [col for col in df.columns if col in required_columns]
if len(found_columns) >= 2: # 至少找到2个关键列
# 检查是否有实际数据(不只是表头)
if len(df) > 1:
valid_sheets.append(sheet_name)
except Exception as e:
continue
except Exception as e:
print(f"读取文件 {file_path} 时出错: {e}")
return valid_sheets
def get_common_sheets(self) -> List[str]:
"""获取两个文件的共同工作表"""
if not self.file1_sheets or not self.file2_sheets:
return []
# 标准化工作表名称(去除空格和特殊字符)
file1_clean = [self.standardize_sheet_name(sheet) for sheet in self.file1_sheets]
file2_clean = [self.standardize_sheet_name(sheet) for sheet in self.file2_sheets]
# 找出共同的工作表
common_sheets = []
for sheet1 in self.file1_sheets:
clean_sheet1 = self.standardize_sheet_name(sheet1)
for sheet2 in self.file2_sheets:
clean_sheet2 = self.standardize_sheet_name(sheet2)
if clean_sheet1 == clean_sheet2:
common_sheets.append(sheet1)
break
return common_sheets
def standardize_sheet_name(self, sheet_name: str) -> str:
"""标准化工作表名称,便于比较"""
return str(sheet_name).strip().lower().replace(' ', '_').replace('-', '_')
def load_bom_data(self, file_path: str, sheet_name: str) -> pd.DataFrame:
"""加载BOM数据"""
try:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 清理数据:去除空行和空列
df = df.dropna(how='all').dropna(axis=1, how='all')
# 清理列名
df.columns = df.columns.str.strip()
return df
except Exception as e:
print(f"加载sheet {sheet_name} 时出错: {e}")
return pd.DataFrame()
def should_compare_column(self, column_name: str) -> bool:
"""判断是否应该对比该列(排除检查信息类列)"""
exclude_keywords = ['检查', '校验', '状态', '备注', 'comment', 'check']
column_lower = str(column_name).lower()
# 检查是否在排除列表中
if column_name in self.columns_to_exclude:
return False
# 检查是否包含排除关键词
for keyword in exclude_keywords:
if keyword in column_lower:
return False
return True
def get_columns_to_compare(self, df1: pd.DataFrame, df2: pd.DataFrame) -> List[str]:
"""获取需要对比的列名(排除检查信息类列)"""
common_columns = list(set(df1.columns).intersection(set(df2.columns)))
# 过滤掉不需要对比的列
columns_to_compare = [col for col in common_columns if self.should_compare_column(col)]
return columns_to_compare
def compare_dataframes(self, df1: pd.DataFrame, df2: pd.DataFrame, sheet_name1: str, sheet_name2: str) -> Dict:
"""对比两个DataFrame的差异排除检查信息类列"""
differences = {
'sheet_names': f"{sheet_name1} vs {sheet_name2}",
'added_rows': [],
'removed_rows': [],
'modified_rows': [],
'columns_comparison': {},
'summary': {
'total_rows_df1': len(df1),
'total_rows_df2': len(df2),
'added_count': 0,
'removed_count': 0,
'modified_count': 0
},
'original_dfs': {
'df1': df1.copy(),
'df2': df2.copy()
}
}
# 确定关键列用于行匹配
key_columns = self.identify_key_columns(df1, df2)
if not key_columns:
differences['error'] = "无法确定用于对比的关键列"
return differences
try:
# 设置索引
df1_indexed = df1.set_index(key_columns)
df2_indexed = df2.set_index(key_columns)
# 获取需要对比的列(排除检查信息类列)
columns_to_compare = self.get_columns_to_compare(df1, df2)
# 找出新增的行
new_indexes = df2_indexed.index.difference(df1_indexed.index)
if len(new_indexes) > 0:
differences['added_rows'] = df2_indexed.loc[new_indexes].reset_index().to_dict('records')
differences['summary']['added_count'] = len(new_indexes)
# 找出删除的行
removed_indexes = df1_indexed.index.difference(df2_indexed.index)
if len(removed_indexes) > 0:
differences['removed_rows'] = df1_indexed.loc[removed_indexes].reset_index().to_dict('records')
differences['summary']['removed_count'] = len(removed_indexes)
# 找出共同的行并进行详细对比(排除检查信息类列)
common_indexes = df1_indexed.index.intersection(df2_indexed.index)
for idx in common_indexes:
row1 = df1_indexed.loc[idx]
row2 = df2_indexed.loc[idx]
# 检查每列的值是否相同(只对比需要比较的列)
modified_cols = {}
for col in columns_to_compare:
if col in df1_indexed.columns and col in df2_indexed.columns:
val1 = row1[col]
val2 = row2[col]
# 处理NaN值的比较
if pd.isna(val1) and pd.isna(val2):
continue
elif pd.isna(val1) or pd.isna(val2) or str(val1) != str(val2):
modified_cols[col] = {
'old_value': val1,
'new_value': val2
}
if modified_cols:
# 获取完整的行数据以显示所有需要的列
full_row_data = self.get_full_row_data_for_display(df1, df2, idx, key_columns)
differences['modified_rows'].append({
'key_values': dict(zip(key_columns, idx)) if isinstance(idx, tuple) else {key_columns[0]: idx},
'modified_columns': modified_cols,
'full_row_data': full_row_data
})
differences['summary']['modified_count'] += 1
# 列级对比(包含所有列,用于统计)
common_columns = set(df1.columns).intersection(set(df2.columns))
df1_only_columns = set(df1.columns).difference(set(df2.columns))
df2_only_columns = set(df2.columns).difference(set(df1.columns))
# 计算实际参与对比的列
compared_columns = set(columns_to_compare)
excluded_columns = common_columns - compared_columns
differences['columns_comparison'] = {
'common_columns': list(common_columns),
'compared_columns': list(compared_columns),
'excluded_columns': list(excluded_columns),
'file1_only_columns': list(df1_only_columns),
'file2_only_columns': list(df2_only_columns)
}
except Exception as e:
differences['error'] = f"对比过程中出错: {str(e)}"
return differences
def get_full_row_data_for_display(self, df1: pd.DataFrame, df2: pd.DataFrame, idx, key_columns: List[str]) -> Dict:
"""获取完整的行数据用于显示"""
display_data = {}
# 获取两个文件中的对应行数据
row1_data = self.extract_row_data(df1, idx, key_columns)
row2_data = self.extract_row_data(df2, idx, key_columns)
# 定义需要显示的列(排除检查信息类列)
display_columns = ['Purchase_Code', 'MF_PN', 'Description', 'Part Type', 'MF_NAME', 'PCB_Footprint', '合计']
# 过滤掉检查信息类列
display_columns = [col for col in display_columns if self.should_compare_column(col)]
for col in display_columns:
val1 = row1_data.get(col, '')
val2 = row2_data.get(col, '')
# 格式化显示:有差异显示原值->新值,无差异显示原值
if pd.isna(val1) or val1 == '':
display_value = val2
elif pd.isna(val2) or val2 == '':
display_value = val1
elif str(val1) != str(val2):
display_value = f"{val1} -> {val2}"
else:
display_value = val1
display_data[col] = display_value
# 添加文件来源信息
display_data['_from_file1'] = row1_data
display_data['_from_file2'] = row2_data
return display_data
def extract_row_data(self, df: pd.DataFrame, idx, key_columns: List[str]) -> Dict:
"""从DataFrame中提取指定行的数据"""
row_data = {}
try:
if isinstance(idx, tuple):
# 多列索引的情况
mask = pd.Series(True, index=df.index)
for i, key in enumerate(key_columns):
mask = mask & (df[key] == idx[i])
if mask.any():
original_row = df[mask].iloc[0]
for col in df.columns:
row_data[col] = original_row[col]
else:
# 单列索引的情况
matching_rows = df[df[key_columns[0]] == idx]
if len(matching_rows) > 0:
original_row = matching_rows.iloc[0]
for col in df.columns:
row_data[col] = original_row[col]
except Exception as e:
pass
return row_data
def format_value_display(self, value1, value2):
"""格式化值的显示:有差异显示原值->新值,无差异显示原值"""
if pd.isna(value1) or value1 == '':
return value2
elif pd.isna(value2) or value2 == '':
return value1
elif str(value1) != str(value2):
return f"{value1} -> {value2}"
else:
return value1
def get_modified_columns_summary(self, modified_columns: Dict) -> str:
"""获取修改列的概要汇总"""
if not modified_columns:
return "无修改"
modified_list = list(modified_columns.keys())
# 如果修改列数量较少,直接显示
if len(modified_list) <= 3:
return ", ".join(modified_list)
else:
# 数量较多时显示前3个加省略号
return ", ".join(modified_list[:3]) + f"...等{len(modified_list)}"
def identify_key_columns(self, df1: pd.DataFrame, df2: pd.DataFrame) -> List[str]:
"""识别用于行匹配的关键列"""
# 优先使用Partnumber作为关键列
potential_keys = ['Partnumber', 'Purchase_Code', 'MF_PN']
for key in potential_keys:
if key in df1.columns and key in df2.columns:
# 检查该列是否适合作为关键列(不应有过多重复值)
df1_dup_rate = df1[key].duplicated().sum() / len(df1)
df2_dup_rate = df2[key].duplicated().sum() / len(df2)
if df1_dup_rate < 0.1 and df2_dup_rate < 0.1: # 允许少量重复
return [key]
# 如果没有单一关键列,尝试组合
for key_combo in [['Partnumber', 'MF_PN'], ['Purchase_Code', 'MF_PN']]:
if all(col in df1.columns for col in key_combo) and all(col in df2.columns for col in key_combo):
return key_combo
# 最后尝试使用所有找到的共同列
common_cols = list(set(df1.columns).intersection(set(df2.columns)))
if common_cols:
return common_cols[:2] # 最多使用前两列
return []
def generate_output_filename(self) -> str:
"""生成输出文件名以两个文件的有效sheet名称开头"""
if not self.file1_sheets or not self.file2_sheets:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"BOM差异报告_{timestamp}.xlsx"
# 使用第一个文件第一个sheet和第二个文件第一个sheet
file1_sheet_name = str(self.file1_sheets[0]) if self.file1_sheets else "File1"
file2_sheet_name = str(self.file2_sheets[0]) if self.file2_sheets else "File2"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 清理sheet名称中的特殊字符
clean_sheet1 = self.clean_filename(file1_sheet_name)
clean_sheet2 = self.clean_filename(file2_sheet_name)
filename = f"{clean_sheet1}_vs_{clean_sheet2}_差异报告_{timestamp}.xlsx"
return filename
def clean_filename(self, filename: str) -> str:
"""清理文件名中的特殊字符"""
filename = str(filename)
# 移除Windows文件名中不允许的字符
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# 移除多余的空格和特殊字符
filename = filename.replace(' ', '_')
filename = filename.replace('\t', '_')
filename = filename.replace('\n', '_')
# 限制文件名长度
if len(filename) > 50:
filename = filename[:50]
return filename
def clean_sheet_name(self, sheet_name: str, max_length: int = 25) -> str:
"""清理工作表名称确保符合Excel工作表名称限制"""
sheet_name = str(sheet_name)
# 移除Excel工作表名称中不允许的字符
invalid_chars = '[]:*?/\\'
for char in invalid_chars:
sheet_name = sheet_name.replace(char, '_')
# 限制工作表名称长度Excel限制为31个字符
if len(sheet_name) > max_length:
sheet_name = sheet_name[:max_length]
return sheet_name
def get_output_directory(self) -> str:
"""获取输出目录(第二个文件所在目录)"""
return os.path.dirname(self.file2_path)
def generate_difference_report(self) -> str:
"""生成差异报告Excel文件"""
if not self.differences:
return "没有发现差异"
# 生成输出文件名和路径
output_filename = self.generate_output_filename()
output_directory = self.get_output_directory()
output_path = os.path.join(output_directory, output_filename)
try:
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# 创建总摘要表
summary_data = []
for diff_key, differences in self.differences.items():
if 'error' not in differences:
columns_comparison = differences.get('columns_comparison', {})
excluded_count = len(columns_comparison.get('excluded_columns', []))
summary_data.append([
differences.get('sheet_names', diff_key),
differences['summary']['total_rows_df1'],
differences['summary']['total_rows_df2'],
differences['summary']['added_count'],
differences['summary']['removed_count'],
differences['summary']['modified_count'],
excluded_count
])
if summary_data:
summary_df = pd.DataFrame(summary_data, columns=[
'工作表对比', '文件1行数', '文件2行数', '新增行数', '删除行数', '修改行数', '排除列数'
])
summary_df.to_excel(writer, sheet_name='对比摘要', index=False)
# 为每个对比创建详细报告
for diff_key, differences in self.differences.items():
sheet_key = self.clean_sheet_name(diff_key.replace('vs', '_vs_'))
if 'error' in differences:
# 如果有错误,创建错误报告
error_df = pd.DataFrame([['错误信息', differences['error']]])
error_df.to_excel(writer, sheet_name=f"{sheet_key}_错误", index=False, header=False)
continue
# 汇总表 - 包含列对比的详细信息
summary_data = []
summary_data.append(["对比项", "数量"])
summary_data.append(["文件1总行数", differences['summary']['total_rows_df1']])
summary_data.append(["文件2总行数", differences['summary']['total_rows_df2']])
summary_data.append(["新增行数", differences['summary']['added_count']])
summary_data.append(["删除行数", differences['summary']['removed_count']])
summary_data.append(["修改行数", differences['summary']['modified_count']])
summary_data.append(["共同列数", len(differences['columns_comparison']['common_columns'])])
summary_data.append(["实际对比列数", len(differences['columns_comparison']['compared_columns'])])
summary_data.append(["排除列数", len(differences['columns_comparison']['excluded_columns'])])
summary_data.append(["文件1特有列", len(differences['columns_comparison']['file1_only_columns'])])
summary_data.append(["文件2特有列", len(differences['columns_comparison']['file2_only_columns'])])
# 添加排除列详情
excluded_cols = differences['columns_comparison'].get('excluded_columns', [])
if excluded_cols:
summary_data.append(["", ""])
summary_data.append(["排除的列", "(检查信息类列不参与对比)"])
for col in excluded_cols:
summary_data.append(["", f"- {col}"])
pd.DataFrame(summary_data).to_excel(
writer,
sheet_name=f"{sheet_key}_汇总",
index=False,
header=False
)
# 新增行详情
if differences['added_rows']:
pd.DataFrame(differences['added_rows']).to_excel(
writer,
sheet_name=f"{sheet_key}_新增行",
index=False
)
# 删除行详情
if differences['removed_rows']:
pd.DataFrame(differences['removed_rows']).to_excel(
writer,
sheet_name=f"{sheet_key}_删除行",
index=False
)
# 修改行详情 - 优化后的显示格式(排除检查信息列)
if differences['modified_rows']:
modified_data = []
for mod_row in differences['modified_rows']:
# 创建基础记录
record = {
**mod_row['key_values'], # 关键列如Partnumber
'修改列': self.get_modified_columns_summary(mod_row['modified_columns'])
}
# 添加所有需要显示的列(排除检查信息类列)
display_data = mod_row.get('full_row_data', {})
# 获取需要显示的列
display_columns = list(display_data.keys())
display_columns = [col for col in display_columns if
not col.startswith('_') and self.should_compare_column(col)]
for col in display_columns:
record[col] = display_data.get(col, '')
# 添加详细的修改信息(只包括参与对比的列)
for col, values in mod_row['modified_columns'].items():
if self.should_compare_column(col):
record[f'详细_{col}'] = f"{values['old_value']} -> {values['new_value']}"
modified_data.append(record)
if modified_data:
modified_df = pd.DataFrame(modified_data)
# 重新排列列的顺序,让重要信息在前
column_order = list(mod_row['key_values'].keys()) + ['修改列']
# 添加其他显示列
other_columns = [col for col in modified_df.columns
if col not in column_order and not col.startswith('详细_')]
column_order.extend(other_columns)
# 添加详细修改信息列
detailed_cols = [col for col in modified_df.columns if col.startswith('详细_')]
column_order.extend(detailed_cols)
# 确保所有列都存在
existing_columns = [col for col in column_order if col in modified_df.columns]
modified_df = modified_df[existing_columns]
modified_df.to_excel(
writer,
sheet_name=f"{sheet_key}_修改行",
index=False
)
return output_path
except Exception as e:
print(f"生成报告时出错: {e}")
return ""
def run_comparison(self):
"""执行完整的BOM对比流程"""
print("=== BOM文件差异对比工具 ===")
print("注意:检查信息类列(如'检查信息')将不参与修改行对比")
# 1. 选择第一份文件
print("\n步骤1: 选择第一份Excel文件")
self.file1_path = self.select_file("选择第一份BOM Excel文件")
if not self.file1_path:
print("未选择文件,程序退出")
return
self.file1_name = os.path.basename(self.file1_path)
# 2. 选择第二份文件
print("\n步骤2: 选择第二份Excel文件")
self.file2_path = self.select_file("选择第二份BOM Excel文件")
if not self.file2_path:
print("未选择文件,程序退出")
return
self.file2_name = os.path.basename(self.file2_path)
print(f"\n文件1: {self.file1_name}")
print(f"文件2: {self.file2_name}")
# 3. 查找有效sheet
print("\n步骤3: 查找有效的工作表...")
self.file1_sheets = self.find_valid_sheets(self.file1_path)
self.file2_sheets = self.find_valid_sheets(self.file2_path)
print(f"文件1的有效工作表: {self.file1_sheets}")
print(f"文件2的有效工作表: {self.file2_sheets}")
if not self.file1_sheets or not self.file2_sheets:
print("至少有一个文件没有有效的工作表,无法进行对比")
return
# 4. 进行差异对比
print("\n步骤4: 进行差异对比...")
self.differences = {}
# 使用第一个文件第一个sheet和第二个文件第一个sheet进行对比
sheet1 = self.file1_sheets[0]
sheet2 = self.file2_sheets[0]
print(f"正在对比: {sheet1} (文件1) vs {sheet2} (文件2)")
df1 = self.load_bom_data(self.file1_path, sheet1)
df2 = self.load_bom_data(self.file2_path, sheet2)
if df1.empty:
print(f" ⚠ 文件1的工作表 {sheet1} 数据加载失败")
return
if df2.empty:
print(f" ⚠ 文件2的工作表 {sheet2} 数据加载失败")
return
differences = self.compare_dataframes(df1, df2, sheet1, sheet2)
comparison_key = f"{sheet1}_vs_{sheet2}"
self.differences[comparison_key] = differences
if 'error' in differences:
print(f" ⚠ 对比过程中出错: {differences['error']}")
else:
columns_comparison = differences.get('columns_comparison', {})
excluded_count = len(columns_comparison.get('excluded_columns', []))
print(f" √ 完成对比:")
print(f" 文件1行数: {differences['summary']['total_rows_df1']}")
print(f" 文件2行数: {differences['summary']['total_rows_df2']}")
print(f" 新增行数: {differences['summary']['added_count']}")
print(f" 删除行数: {differences['summary']['removed_count']}")
print(f" 修改行数: {differences['summary']['modified_count']}")
print(f" 排除列数: {excluded_count} (检查信息类列不参与对比)")
# 5. 生成差异报告
print("\n步骤5: 生成差异报告...")
output_file = self.generate_difference_report()
if output_file and os.path.exists(output_file):
print(f"\n=== 对比完成 ===")
print(f"差异报告已生成: {os.path.basename(output_file)}")
# print(f"文件位置: {output_file}")
print(f"输出目录: {self.get_output_directory()}")
else:
print("未成功生成差异报告")
def main():
"""主函数"""
comparator = BOMComparator()
comparator.run_comparison()
input("\n按Enter键退出...")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,635 @@
import os
import pandas as pd
import numpy as np
import tkinter as tk
from tkinter import filedialog
from datetime import datetime
import warnings
import re
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
warnings.filterwarnings('ignore', category=UserWarning, module='openpyxl')
class BOMComparator:
def __init__(self):
self.column_mapping = {
'ITEM': 'Partnumber',
'HT PN': 'Partnumber',
'MF PN': 'MF_PN',
'MFG': 'MF_NAME',
'CRD': 'Reference',
'Description': 'Description',
'Qty': 'Quantity',
'焊接方式': '焊接方式',
'Remark': '备注'
}
self.ignore_columns = ['备注']
self.required_columns = list(self.column_mapping.values())
self.change_columns = [
'ITEM', 'HT PN', 'MF PN', 'MFG', 'CRD', 'Description', 'Qty', 'Remark'
]
self.mandatory_keywords = ['item', 'partnumber', 'mfpn']
# 异常记录
self.validation_errors = []
self.stats = {
'old_bom_rows': 0,
'new_bom_rows': 0,
'changed_items': 0,
'added_items': 0,
'removed_items': 0,
'total_errors': 0
}
def normalize_text(self, text):
if pd.isna(text):
return ""
text = str(text)
text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
return text.strip().lower()
def find_header_row(self, df):
print(f"扫描前 {min(20, len(df))} 行寻找标题行...")
for i in range(min(20, len(df))):
row_values = [self.normalize_text(cell) for cell in df.iloc[i].values]
contains_all_keywords = True
for keyword in self.mandatory_keywords:
if not any(keyword in cell_value for cell_value in row_values):
contains_all_keywords = False
break
if contains_all_keywords:
print(f"✅ 找到有效标题行 (索引 {i}),包含所有必需关键词")
return i
error_msg = (
"❌ 未找到有效的标题行:所有标题行必须同时包含以下关键词:\n"
f"- Item (或类似表述)\n"
f"- Partnumber (或类似表述)\n"
f"- MF_PN (或类似表述)\n\n"
"在文件的前20行中没有找到同时包含所有关键词的行。"
)
raise ValueError(error_msg)
def find_active_sheet(self, file_path):
print(f"扫描文件: {os.path.basename(file_path)}")
xls = pd.ExcelFile(file_path)
candidate_sheets = []
for sheet_name in xls.sheet_names:
# 使用 BOM 或 PCBA 作为关键词
if any(keyword in sheet_name.lower() for keyword in ["bom", "pcba"]):
candidate_sheets.append(sheet_name)
print(f" 发现候选Sheet: {sheet_name} - 关键词匹配")
# 第一步优先检查第一个bom候选Sheet
successful_sheet = None
if candidate_sheets:
for first_candidate in candidate_sheets:
# 先检查第一个候选Sheet
# first_candidate = candidate_sheets[0]
try:
print(f" 优先检查候选Sheet: {first_candidate}")
df_preview = pd.read_excel(
file_path,
sheet_name=first_candidate,
header=None,
nrows=20,
engine='openpyxl'
)
header_row_idx = self.find_header_row(df_preview)
print(f"✅ 在候选Sheet '{first_candidate}' 中找到标题行")
# return first_candidate
successful_sheet = first_candidate
break
except Exception as e:
print(f" ❌ 优先候选Sheet '{first_candidate}': {str(e)}")
# 移除失败的首选候选
# candidate_sheets.pop(0)
# remove(值) - 移除指定值的元素
# candidate_sheets.remove(first_candidate) # 移除值为 'sheet_name' 的元素
continue
if successful_sheet:
return successful_sheet
# 第二步如果没找到bom候选Sheet或首选候选失败遍历所有候选Sheet
if not successful_sheet:
candidate_sheets = xls.sheet_names
print(" 未找到名称包含'BOM'的Sheet将检查所有Sheet")
# 遍历剩余候选Sheet
for sheet_name in candidate_sheets:
try:
print(f" 检查Sheet: {sheet_name}")
df_preview = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=None,
nrows=20,
engine='openpyxl'
)
try:
header_row_idx = self.find_header_row(df_preview)
print(f"✅ 在Sheet '{sheet_name}' 中找到标题行")
return sheet_name
except ValueError as e:
print(f" ❌ Sheet '{sheet_name}': {str(e)}")
continue
except Exception as e:
print(f" 检查Sheet '{sheet_name}' 时出错: {str(e)}")
continue
# 第三步如果所有候选Sheet都失败尝试第一个Sheet作为备选
print("⚠️ 所有候选Sheet检查失败尝试第一个Sheet")
first_sheet = xls.sheet_names[0]
try:
df_preview = pd.read_excel(
file_path,
sheet_name=first_sheet,
header=None,
nrows=20,
engine='openpyxl'
)
header_row_idx = self.find_header_row(df_preview)
print(f"✅ 在备份Sheet '{first_sheet}' 中找到标题行")
return first_sheet
except Exception as e:
print(f"❌ 备份Sheet '{first_sheet}' 也失败: {str(e)}")
return None
def validate_bom(self, bom_df, file_name, sheet_name):
"""验证BOM数据并收集异常"""
errors = []
# 1. 检查Partnumber是否有重复
dup_partnumbers = bom_df[bom_df.duplicated('Partnumber', keep=False)]
if not dup_partnumbers.empty:
print(f"⚠️ 发现重复的Partnumber: {len(dup_partnumbers)}")
for idx, row in dup_partnumbers.iterrows():
error = {
'文件': file_name,
'Sheet': sheet_name,
'原始行号': idx + 2, # Excel行号从1开始标题行下一行
'异常类型': '重复Partnumber',
'异常描述': f"Partnumber '{row['Partnumber']}' 重复出现"
}
errors.append(error)
# 2. 检查Partnumber是否为空
empty_partnumbers = bom_df[bom_df['Partnumber'].isna() | (bom_df['Partnumber'] == '')]
if not empty_partnumbers.empty:
print(f"⚠️ 发现空Partnumber: {len(empty_partnumbers)}")
for idx, row in empty_partnumbers.iterrows():
error = {
'文件': file_name,
'Sheet': sheet_name,
'原始行号': idx + 2,
'异常类型': '空Partnumber',
'异常描述': "Partnumber为空"
}
errors.append(error)
# 3. 验证Reference位号数量与Quantity是否一致
for idx, row in bom_df.iterrows():
# # 跳过PCB等特殊项
# if row.get('Part Type') == 'PCB' or pd.isna(row.get('Reference')):
# continue
refs = str(row['Reference'])
qty = row['Quantity']
try:
# 计算实际位号数量
ref_count = len([r for r in refs.split(',') if r.strip()])
# 检查Quantity是否为数字
try:
qty_val = int(qty)
except (ValueError, TypeError):
qty_val = -1
# 验证数量一致性
if ref_count != qty_val:
error = {
'文件': file_name,
'Sheet': sheet_name,
'原始行号': idx + 2,
'异常类型': '数量不一致',
'异常描述': f"位号数量({ref_count}) ≠ Quantity({qty})"
}
errors.append(error)
except Exception as e:
error = {
'文件': file_name,
'Sheet': sheet_name,
'原始行号': idx + 2,
'异常类型': '验证错误',
'异常描述': f"验证异常: {str(e)}"
}
errors.append(error)
return errors
def load_bom(self, file_path):
print(f"识别激活Sheet...")
active_sheet = self.find_active_sheet(file_path)
print(f"📊 使用Sheet: {active_sheet}")
df_preview = pd.read_excel(
file_path,
sheet_name=active_sheet,
header=None,
nrows=20
)
header_row_idx = self.find_header_row(df_preview)
print("加载完整BOM数据...")
bom_df = pd.read_excel(
file_path,
sheet_name=active_sheet,
header=header_row_idx,
dtype=str
)
if "old_bom_rows" not in self.stats or self.stats['old_bom_rows'] == 0:
self.stats['old_bom_rows'] = len(bom_df)
else:
self.stats['new_bom_rows'] = len(bom_df)
# 清理列名
bom_df.columns = [str(col).strip() for col in bom_df.columns]
print(f" 原始列名: {list(bom_df.columns)}")
# 列名标准化映射
column_aliases = {
'Item': 'Item',
'Partnumber': 'Partnumber',
'Part Number': 'Partnumber',
'Purchase_Code': 'Purchase_Code',
'MF_PN': 'MF_PN',
'Description': 'Description',
'Part Type': 'Part Type',
'MF_NAME': 'MF_NAME',
'Manufacturer': 'MF_NAME',
'PCB_Footprint': 'PCB_Footprint',
'Reference': 'Reference',
'References': 'Reference',
'Quantity': 'Quantity',
'Qty': 'Quantity',
'加工方式': '焊接方式',
'焊接方式': '焊接方式',
'Value': 'Value',
'备注': '备注',
'Remark': '备注',
'Comments': '备注'
}
# 应用别名映射
bom_df = bom_df.rename(columns={col: alias for col, alias in column_aliases.items()
if col in bom_df.columns})
print(f" 标准化后列名: {list(bom_df.columns)}")
# 确保所有必需列存在
missing_cols = [col for col in self.required_columns if col not in bom_df.columns]
if missing_cols:
raise ValueError(f"❌ 缺少必需列: {', '.join(missing_cols)}")
# 清理数据:去除空行和无效项
initial_count = len(bom_df)
bom_df = bom_df.replace('', np.nan)
bom_df = bom_df.dropna(subset=['Item'], how='all')
cleaned_count = len(bom_df)
if initial_count > cleaned_count:
print(
f" 清理空行: 移除 {initial_count - cleaned_count} 行 (原 {initial_count} 行 -> 现 {cleaned_count} 行)")
# 执行数据验证
file_name = os.path.basename(file_path)
errors = self.validate_bom(bom_df, file_name, active_sheet)
self.validation_errors.extend(errors)
self.stats['total_errors'] += len(errors)
if errors:
print(f"⚠️ 在 '{file_name}' 中发现 {len(errors)} 个数据异常")
return bom_df, active_sheet
def compare_reference_lists(self, old_refs_str, new_refs_str):
"""比较两个Reference列表返回差异描述"""
if pd.isna(old_refs_str):
old_refs_str = ""
if pd.isna(new_refs_str):
new_refs_str = ""
old_refs = set([ref.strip() for ref in str(old_refs_str).split(',') if ref.strip()])
new_refs = set([ref.strip() for ref in str(new_refs_str).split(',') if ref.strip()])
# 如果两个集合相同,返回空字符串表示无差异
if old_refs == new_refs:
return ""
# 计算差异
added_refs = new_refs - old_refs
removed_refs = old_refs - new_refs
diff_msgs = []
if added_refs:
diff_msgs.append(f"增加位号: {','.join(sorted(added_refs))}")
if removed_refs:
diff_msgs.append(f"删除位号: {','.join(sorted(removed_refs))}")
return "; ".join(diff_msgs)
def compare_boms(self, old_bom, new_bom):
print("开始比较两份BOM...")
old_bom['Partnumber'] = old_bom['Partnumber'].astype(str).str.strip()
new_bom['Partnumber'] = new_bom['Partnumber'].astype(str).str.strip()
changes = []
old_partnumbers = set(old_bom['Partnumber'].unique())
if len(old_partnumbers) != len(old_bom):
print(f"⚠️ 旧BOM有重复的Partnumber: 总行数{len(old_bom)},唯一物料数{len(old_partnumbers)}")
new_partnumbers = set(new_bom['Partnumber'].unique())
if len(new_partnumbers) != len(new_bom):
print(f"⚠️ 新BOM有重复的Partnumber: 总行数{len(new_bom)},唯一物料数{len(new_partnumbers)}")
all_partnumbers = sorted(old_partnumbers | new_partnumbers)
print(f" 总物料项数量: {len(all_partnumbers)} (旧BOM: {len(old_partnumbers)}, 新BOM: {len(new_partnumbers)})")
for idx, pn in enumerate(all_partnumbers):
if (idx + 1) % 100 == 0 or (idx + 1) == len(all_partnumbers):
print(f" 处理进度: {idx + 1}/{len(all_partnumbers)} 项物料")
record = {'ITEM_OLD': '', 'ITEM_NEW': ''}
old_row = None
new_row = None
change_desc = ""
old_match = old_bom[old_bom['Partnumber'] == pn]
if not old_match.empty:
old_row = old_match.iloc[0]
record['ITEM_OLD'] = old_row['Item']
new_match = new_bom[new_bom['Partnumber'] == pn]
if not new_match.empty:
new_row = new_match.iloc[0]
record['ITEM_NEW'] = new_row['Item']
change_type = ""
if old_row is None:
change_type = "新增"
self.stats['added_items'] += 1
change_desc = "新增物料"
elif new_row is None:
change_type = "删除"
self.stats['removed_items'] += 1
change_desc = "删除物料"
else:
change_type = "变更"
self.stats['changed_items'] += 1
# 填充左侧列旧BOM值
for change_col, bom_col in self.column_mapping.items():
if change_col == 'ITEM':
continue
old_val = old_row[bom_col] if old_row is not None and bom_col in old_row else ''
record[change_col] = old_val
# 填充右侧列新BOM值
for change_col, bom_col in self.column_mapping.items():
if change_col == 'ITEM':
continue
new_val = new_row[bom_col] if new_row is not None and bom_col in new_row else ''
record[f'NEW_{change_col}'] = new_val
if change_type == "变更":
change_details = []
qty_changed = False
if 'Quantity' in old_row.index and 'Quantity' in new_row.index:
old_qty = str(old_row['Quantity'])
new_qty = str(new_row['Quantity'])
if old_qty != new_qty:
change_details.append(f"Qty: {old_qty}{new_qty}")
qty_changed = True
mfpn_changed = False
if 'MF_PN' in old_row.index and 'MF_PN' in new_row.index:
old_mfpn = str(old_row['MF_PN'])
new_mfpn = str(new_row['MF_PN'])
if old_mfpn != new_mfpn:
change_details.append(f"MF PN: {old_mfpn}{new_mfpn}")
mfpn_changed = True
# 优化使用新的Reference比较方法
if 'Reference' in old_row.index and 'Reference' in new_row.index:
ref_diff = self.compare_reference_lists(old_row['Reference'], new_row['Reference'])
if ref_diff:
change_details.append(ref_diff)
for change_col, bom_col in self.column_mapping.items():
if (change_col == 'ITEM' or
bom_col in ['Quantity', 'MF_PN', 'Reference'] or
bom_col in self.ignore_columns):
continue
old_val = old_row[bom_col] if old_row is not None and bom_col in old_row else ''
new_val = new_row[bom_col] if new_row is not None and bom_col in new_row else ''
if str(old_val) != str(new_val):
change_details.append(f"{change_col}: {old_val}{new_val}")
if change_details:
change_desc = "; ".join(change_details)
else:
change_type = ""
record['Design change Type'] = change_type
record['NEW_Remark'] = change_desc
if change_type:
changes.append(record)
left_columns = ['ITEM_OLD'] + [col for col in self.change_columns if col != 'ITEM']
middle_columns = ['Design change Type']
right_columns = ['ITEM_NEW'] + [f'NEW_{col}' for col in self.change_columns if col != 'ITEM']
if 'NEW_Remark' in right_columns:
right_columns.remove('NEW_Remark')
right_columns.append('NEW_Remark')
change_columns = left_columns + middle_columns + right_columns
right_start_col = len(left_columns) + len(middle_columns) + 1
return pd.DataFrame(changes, columns=change_columns), right_start_col
def generate_summary(self):
summary = [
"\n" + "=" * 50,
"BOM 比较处理汇总",
"-" * 50,
f"原始BOM行数: {self.stats['old_bom_rows']}",
f"新BOM行数: {self.stats['new_bom_rows']}",
f"变更物料数量: {self.stats['changed_items']}",
f"新增物料数量: {self.stats['added_items']}",
f"删除物料数量: {self.stats['removed_items']}",
f"变更记录总数: {self.stats['changed_items'] + self.stats['added_items'] + self.stats['removed_items']}",
f"数据异常总数: {self.stats['total_errors']}",
"=" * 50
]
return "\n".join(summary)
def generate_change_record(self):
root = tk.Tk()
root.withdraw()
# 重置统计信息和异常记录
self.stats = {
'old_bom_rows': 0,
'new_bom_rows': 0,
'changed_items': 0,
'added_items': 0,
'removed_items': 0,
'total_errors': 0
}
self.validation_errors = []
try:
# 选择原始BOM文件
print("\n" + "=" * 50)
print("步骤 1/4: 选择原始BOM文件")
print("=" * 50)
old_file = filedialog.askopenfilename(
title="选择原始BOM文件",
filetypes=[("Excel Files", "*.xlsx *.xls")]
)
if not old_file:
print("❌ 未选择文件,操作取消")
return
print(f"📂 已选择原始BOM: {old_file}")
old_file_name = os.path.basename(old_file)
# output_dir = os.path.dirname(old_file)
# 选择变更后BOM文件
print("\n" + "=" * 50)
print("步骤 2/4: 选择变更后BOM文件")
print("=" * 50)
new_file = filedialog.askopenfilename(
title="选择变更后BOM文件",
filetypes=[("Excel Files", "*.xlsx *.xls")]
)
if not new_file:
print("❌ 未选择文件,操作取消")
return
print(f"📂 已选择新BOM: {new_file}")
new_file_name = os.path.basename(new_file)
output_dir = os.path.dirname(new_file)
# 加载BOM文件
print("\n" + "=" * 50)
print("步骤 3/4: 加载并处理BOM文件")
print("=" * 50)
print(f"🔍 加载原始BOM文件: {old_file_name}")
old_bom, old_bom_activesheetname = self.load_bom(old_file)
print(f"✅ 原始BOM加载完成{len(old_bom)}")
print(f"\n🔍 加载变更后BOM文件: {new_file_name}")
new_bom, new_bom_activesheetname = self.load_bom(new_file)
print(f"✅ 新BOM加载完成{len(new_bom)}")
# 比较BOM生成变更记录
print("\n" + "=" * 50)
print("步骤 4/4: 比较BOM差异并生成变更记录")
print("=" * 50)
print("🔍 比较BOM差异...")
change_df, right_start_col = self.compare_boms(old_bom, new_bom)
# 准备输出文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"{old_bom_activesheetname} to {new_bom_activesheetname} eBOM_change_record_{timestamp}.xlsx"
output_path = os.path.join(output_dir, output_file)
# 保存变更记录和异常记录
print(f"\n💾 保存变更记录文件: {output_path}")
wb = Workbook()
# 创建变更记录工作表
ws_change = wb.active
ws_change.title = "PCBA_BOM_change record"
if change_df.empty:
ws_change.cell(row=1, column=1, value="两份BOM完全相同无变更记录")
print("✅ 两份BOM完全相同无变更记录")
else:
# 重命名列
column_rename = {
'ITEM_OLD': 'ITEM',
'ITEM_NEW': 'ITEM',
**{f'NEW_{col}': col for col in self.change_columns if col != 'ITEM'},
'NEW_Remark': 'Remark'
}
change_df = change_df.rename(columns=column_rename)
# 添加文件名信息
ws_change.cell(row=1, column=1, value=old_file_name)
ws_change.cell(row=1, column=right_start_col, value=new_file_name)
# 添加列标题
col_names = change_df.columns.tolist()
for col_idx, col_name in enumerate(col_names, 1):
ws_change.cell(row=2, column=col_idx, value=col_name)
# 添加数据行
for r_idx, row in enumerate(dataframe_to_rows(change_df, index=False, header=False), 3):
for c_idx, value in enumerate(row, 1):
ws_change.cell(row=r_idx, column=c_idx, value=value)
# 创建异常记录工作表
if self.validation_errors:
print(f"⚠️ 发现 {len(self.validation_errors)} 个数据异常,创建异常记录")
ws_errors = wb.create_sheet(title="BOM异常记录")
# 异常记录列名
error_columns = ['文件', 'Sheet', '原始行号', '异常类型', '异常描述']
for col_idx, col_name in enumerate(error_columns, 1):
ws_errors.cell(row=1, column=col_idx, value=col_name)
# 添加异常数据
for row_idx, error in enumerate(self.validation_errors, 2):
ws_errors.cell(row=row_idx, column=1, value=error['文件'])
ws_errors.cell(row=row_idx, column=2, value=error['Sheet'])
ws_errors.cell(row=row_idx, column=3, value=error['原始行号'])
ws_errors.cell(row=row_idx, column=4, value=error['异常类型'])
ws_errors.cell(row=row_idx, column=5, value=error['异常描述'])
# 保存工作簿
wb.save(output_path)
# 打印处理汇总
print(self.generate_summary())
print(f"\n✅ 变更记录已保存至: {output_path}")
except Exception as e:
print(f"\n❌ 处理过程中出错: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
print("=" * 60)
print(" PCBA BOM 变更记录生成工具 ")
print("=" * 60)
print("要求: 标题行必须同时包含 'Item', 'Partnumber', 'MF_PN'")
comparator = BOMComparator()
comparator.generate_change_record()
print("\n" + "=" * 50)
print(" 处理完成,按任意键退出... ")
# input()

View File

@@ -0,0 +1,618 @@
import pandas as pd
import os
import glob
import re
from datetime import datetime
import tkinter as tk
from tkinter import filedialog
from collections import defaultdict
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
@dataclass
class ProcessedFileInfo:
"""处理文件信息类"""
filename: str
sheet_name: str
start_row: int
total_rows: int
valid_rows: int
@dataclass
class BOMRow:
"""BOM行数据类"""
partnumber: str
purchase_code: str
mf_pn: str
description: str
part_type: str
mf_name: str
pcb_footprint: str
quantity: int
reference: str
filename: str = ""
sheet_name: str = ""
@classmethod
def from_dataframe_row(cls, row: pd.Series, filename: str = "", sheet_name: str = "") -> Optional['BOMRow']:
"""从DataFrame行创建BOMRow对象"""
try:
return cls(
partnumber=str(row.get('Partnumber', '')).strip(),
purchase_code=str(row.get('Purchase_Code', '')).strip(),
mf_pn=str(row.get('MF_PN', '')).strip(),
description=str(row.get('Description', '')).strip(),
part_type=str(row.get('Part_Type', '')).strip(),
mf_name=str(row.get('MF_NAME', '')).strip(),
pcb_footprint=str(row.get('PCB_Footprint', '')).strip(),
quantity=int(row.get('Quantity', 0)),
reference=str(row.get('Reference', '')).strip(),
filename=filename,
sheet_name=sheet_name
)
except (ValueError, TypeError):
return None
def get_key(self) -> str:
"""获取行的唯一标识键"""
return self.partnumber if self.partnumber else self.mf_pn
def is_valid(self) -> bool:
"""检查行数据是否有效"""
return bool(self.get_key())
@dataclass
class ConsolidatedMaterial:
"""合并后的物料数据类"""
partnumber: str
purchase_code: str
mf_pn: str
description: str
part_type: str
mf_name: str
pcb_footprint: str
quantity_data: Dict[str, int] # 文件名: 数量
inconsistencies: List[str]
@property
def total_quantity(self) -> int:
"""计算总数量"""
return sum(self.quantity_data.values())
@property
def has_inconsistencies(self) -> bool:
"""检查是否有不一致"""
return len(self.inconsistencies) > 0
class ConsistencyChecker:
"""一致性检查器"""
def __init__(self):
self.fields_to_check = ['Purchase_Code', 'MF_PN', 'Part_Type', 'MF_NAME', 'PCB_Footprint']
def check_field_consistency(self, existing: ConsolidatedMaterial, new_row: BOMRow) -> List[str]:
"""检查字段一致性"""
inconsistencies = []
field_mapping = {
'Purchase_Code': ('purchase_code', 'Purchase_Code'),
'MF_PN': ('mf_pn', 'MF_PN'),
'Part_Type': ('part_type', 'Part Type'),
'MF_NAME': ('mf_name', 'MF_NAME'),
'PCB_Footprint': ('pcb_footprint', 'PCB_Footprint')
}
for field, (attr_name, row_field) in field_mapping.items():
existing_val = getattr(existing, attr_name)
new_val = getattr(new_row, attr_name)
if self._should_check_field(existing_val, new_val) and existing_val != new_val:
inconsistencies.append(
f"{field}不一致: {existing_val}{new_val} (文件: {new_row.filename}, Sheet: {new_row.sheet_name})"
)
return inconsistencies
def check_quantity_reference(self, row: BOMRow) -> Optional[str]:
"""检查Reference数量和Quantity是否匹配"""
if not row.reference:
return None
ref_count = len([ref for ref in row.reference.split(',') if ref.strip()])
if ref_count != row.quantity:
return f"Reference数量不符: {ref_count}个位置 ≠ Quantity={row.quantity} (文件: {row.filename}, Sheet: {row.sheet_name})"
return None
def _should_check_field(self, existing_val: str, new_val: str) -> bool:
"""判断是否应该检查字段"""
# 忽略空值和无意义值
if not new_val or new_val.lower() in ['', 'nan', 'none', 'null']:
return False
return True
class BOMFileParser:
"""BOM文件解析器"""
def __init__(self):
self.required_headers = ['Item', 'Partnumber', 'Purchase_Code', 'MF_PN']
self.required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description',
'Part_Type', 'MF_NAME', 'PCB_Footprint', 'Quantity', 'Reference']
def find_valid_sheet(self, file_path: str) -> Optional[Tuple[str, int]]:
"""定位包含有效BOM的Sheet"""
try:
xl = pd.ExcelFile(file_path)
for sheet_name in xl.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
for i in range(min(len(df), 10)): # 只检查前10行
headers = df.iloc[i].values
if all(col in str(headers) for col in self.required_headers):
filename = os.path.basename(file_path)
print(f"文件{filename}找到有效sheet {sheet_name}|有效数据行从 {i} 开始。")
return sheet_name, i
except Exception as e:
print(f"读取文件 {file_path} 时出错: {e}")
return None, None
def parse_file(self, file_path: str) -> Optional[Tuple[List[BOMRow], ProcessedFileInfo]]:
"""解析BOM文件"""
filename = os.path.basename(file_path)
sheet_name, header_row = self.find_valid_sheet(file_path)
if not sheet_name:
return None
try:
df = pd.read_excel(file_path, sheet_name=sheet_name, header=header_row)
total_rows = len(df)
df = self._clean_dataframe(df)
if not self._validate_columns(df):
return None
bom_rows = []
valid_rows = 0
for _, row_data in df.iterrows():
bom_row = BOMRow.from_dataframe_row(row_data, filename, sheet_name)
if bom_row and bom_row.is_valid():
bom_rows.append(bom_row)
valid_rows += 1
# 创建文件信息对象
file_info = ProcessedFileInfo(
filename=filename,
sheet_name=sheet_name,
start_row=header_row,
total_rows=total_rows,
valid_rows=valid_rows
)
return bom_rows, file_info
except Exception as e:
print(f"解析文件 {file_path} 时出错: {e}")
return None
def _clean_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""清洗DataFrame"""
# 清理列名
df.columns = df.columns.str.strip().str.replace(r'\s+', '_', regex=True)
df.columns = df.columns.str.replace(r'[^a-zA-Z0-9_]', '', regex=True)
# 去除空行
df = df.dropna(how='all')
return df
def _validate_columns(self, df: pd.DataFrame) -> bool:
"""验证必要列是否存在"""
missing_cols = [col for col in self.required_columns if col not in df.columns]
return len(missing_cols) == 0
class MaterialConsolidator:
"""物料合并器"""
def __init__(self):
self.materials: Dict[str, ConsolidatedMaterial] = {}
self.consistency_checker = ConsistencyChecker()
self.file_quantities: Dict[str, Dict[str, int]] = defaultdict(dict)
self.processed_files_info: List[ProcessedFileInfo] = []
def add_bom_row(self, bom_row: BOMRow) -> None:
"""添加BOM行数据"""
key = bom_row.get_key()
if key not in self.materials:
# 创建新的合并物料
self.materials[key] = ConsolidatedMaterial(
partnumber=bom_row.partnumber,
purchase_code=bom_row.purchase_code,
mf_pn=bom_row.mf_pn,
description=bom_row.description,
part_type=bom_row.part_type,
mf_name=bom_row.mf_name,
pcb_footprint=bom_row.pcb_footprint,
quantity_data={},
inconsistencies=[]
)
material = self.materials[key]
# 检查一致性
inconsistencies = self.consistency_checker.check_field_consistency(material, bom_row)
material.inconsistencies.extend(inconsistencies)
# 检查数量引用
ref_inconsistency = self.consistency_checker.check_quantity_reference(bom_row)
if ref_inconsistency:
material.inconsistencies.append(ref_inconsistency)
# 记录数量数据
material.quantity_data[bom_row.filename] = bom_row.quantity
self.file_quantities[bom_row.filename][key] = bom_row.quantity
def add_file_info(self, file_info: ProcessedFileInfo) -> None:
"""添加文件处理信息"""
self.processed_files_info.append(file_info)
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total_inconsistencies = sum(len(mat.inconsistencies) for mat in self.materials.values())
materials_with_issues = sum(1 for mat in self.materials.values() if mat.has_inconsistencies)
return {
'total_materials': len(self.materials),
'total_inconsistencies': total_inconsistencies,
'materials_with_issues': materials_with_issues,
'file_count': len(self.file_quantities),
'processed_files_info': self.processed_files_info
}
class ReportGenerator:
"""报告生成器"""
def __init__(self, output_folder: str):
self.output_folder = output_folder
self._ensure_output_directory()
def _ensure_output_directory(self):
"""确保输出目录存在"""
output_dir = os.path.join(self.output_folder, "BOM_Merge_out")
os.makedirs(output_dir, exist_ok=True)
def _create_summary_sheet(self, stats: Dict[str, Any]) -> pd.DataFrame:
"""创建汇总信息Sheet"""
summary_data = [
["BOM合并检查汇总报告", ""],
["生成时间", datetime.now().strftime("%Y-%m-%d %H:%M:%S")],
["", ""],
["处理统计", ""],
["扫描文件总数", stats['total_files']],
["成功处理文件数", stats['processed_files']],
["处理数据行数", stats['processed_rows']],
["", ""],
["物料统计", ""],
["合并物料种类数", stats['total_materials']],
["存在问题的物料数", stats['materials_with_issues']],
["不一致问题总数", stats['total_inconsistencies']],
["", ""],
["数据源文件信息", ""],
["有效文件总数", len(stats.get('processed_files_info', []))],
["", ""]
]
# 添加详细的数据源文件信息
files_info = stats.get('processed_files_info', [])
for i, file_info in enumerate(files_info, 1):
summary_data.extend([
[f"数据源文件 {i}", file_info.filename],
[" Sheet名称", file_info.sheet_name],
[" 起始行", file_info.start_row + 1], # 转换为1-based索引
[" 总行数", file_info.total_rows],
[" 有效行数", file_info.valid_rows],
["", ""]
])
summary_data.extend([
["", ""],
["文件信息", ""],
["输出文件夹", os.path.join(self.output_folder, "BOM_Merge_out")],
["报告文件", stats.get('output_filename', '')],
["合并Sheet名称", "BOM_Merge"]
])
return pd.DataFrame(summary_data, columns=["项目", "数值"])
def _create_data_source_sheet(self, stats: Dict[str, Any]) -> pd.DataFrame:
"""创建数据源文件详细信息Sheet"""
files_info = stats.get('processed_files_info', [])
if not files_info:
return pd.DataFrame([["无有效数据源文件", ""]], columns=["状态", "说明"])
data_source_data = []
for i, file_info in enumerate(files_info, 1):
data_source_data.append({
'序号': i,
'文件名': file_info.filename,
'Sheet名称': file_info.sheet_name,
'数据起始行': file_info.start_row + 1, # 转换为1-based索引
'总行数': file_info.total_rows,
'有效行数': file_info.valid_rows,
'处理状态': '成功'
})
return pd.DataFrame(data_source_data)
def _create_merge_sheet(self, consolidator: MaterialConsolidator) -> pd.DataFrame:
"""创建合并数据Sheet"""
report_data = []
file_columns = sorted(consolidator.file_quantities.keys())
for material in consolidator.materials.values():
row = {
'Partnumber': material.partnumber,
'Purchase_Code': material.purchase_code,
'MF_PN': material.mf_pn,
'Description': material.description,
'Part Type': material.part_type,
'MF_NAME': material.mf_name,
'PCB_Footprint': material.pcb_footprint,
'检查信息': '; '.join(material.inconsistencies) if material.inconsistencies else '一致'
}
# 添加各文件数量
for file in file_columns:
row[file] = material.quantity_data.get(file, 0)
row['合计'] = material.total_quantity
report_data.append(row)
return pd.DataFrame(report_data)
def generate_consolidated_report(self, consolidator: MaterialConsolidator, stats: Dict[str, Any]) -> Optional[str]:
"""生成包含多个Sheet的合并报告"""
if not consolidator.materials:
return None
# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"BOM合并报告_{timestamp}.xlsx"
output_path = os.path.join(self.output_folder, "BOM_Merge_out", output_filename)
try:
# 使用ExcelWriter创建多Sheet的Excel文件
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# Sheet 1: 汇总信息
summary_df = self._create_summary_sheet(stats)
summary_df.to_excel(writer, sheet_name='汇总信息', index=False)
# Sheet 2: 数据源文件信息
data_source_df = self._create_data_source_sheet(stats)
data_source_df.to_excel(writer, sheet_name='数据源文件', index=False)
# Sheet 3: 合并数据
merge_df = self._create_merge_sheet(consolidator)
merge_df.to_excel(writer, sheet_name='BOM_Merge', index=False)
# 调整列宽
workbook = writer.book
# 调整汇总信息Sheet列宽
summary_sheet = workbook['汇总信息']
summary_sheet.column_dimensions['A'].width = 25
summary_sheet.column_dimensions['B'].width = 40
# 调整数据源文件Sheet列宽
data_source_sheet = workbook['数据源文件']
for col in data_source_sheet.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 30)
data_source_sheet.column_dimensions[column].width = adjusted_width
# 调整合并数据Sheet列宽
merge_sheet = workbook['BOM_Merge']
for col in merge_sheet.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
merge_sheet.column_dimensions[column].width = adjusted_width
# 更新stats中的文件名
stats['output_filename'] = output_filename
return output_path
except Exception as e:
print(f"保存报告失败: {e}")
return None
class BOMProcessor:
"""BOM处理器 - 主控制器"""
def __init__(self):
self.file_parser = BOMFileParser()
self.material_consolidator = MaterialConsolidator()
self.report_generator: Optional[ReportGenerator] = None
# 统计信息
self.processed_files = 0
self.processed_rows = 0
self.total_files = 0
def set_output_folder(self, folder_path: str):
"""设置输出文件夹"""
self.report_generator = ReportGenerator(folder_path)
def process_folder(self, folder_path: str) -> bool:
"""处理文件夹中的所有BOM文件"""
bom_files = glob.glob(os.path.join(folder_path, "*.xlsx"))
self.total_files = len(bom_files)
if not bom_files:
return False
successful_files = 0
for file_path in bom_files:
if self._process_single_file(file_path):
successful_files += 1
self.processed_files = successful_files
return successful_files > 0
def _process_single_file(self, file_path: str) -> bool:
"""处理单个文件"""
filename = os.path.basename(file_path)
print(f"处理文件: {filename}...")
result = self.file_parser.parse_file(file_path)
if not result:
print(f" ! 无法解析文件: {filename}")
return False
bom_rows, file_info = result
print(f" √ 文件{filename}找到 {len(bom_rows)} 行有效数据 (Sheet: {file_info.sheet_name})")
# 添加文件处理信息
self.material_consolidator.add_file_info(file_info)
# 处理BOM行数据
for bom_row in bom_rows:
self.material_consolidator.add_bom_row(bom_row)
self.processed_rows += 1
return True
def generate_report(self) -> Optional[Dict[str, Any]]:
"""生成报告并返回统计信息"""
if not self.report_generator:
return None
# 获取基本统计信息
base_stats = self.material_consolidator.get_statistics()
base_stats.update({
'processed_files': self.processed_files,
'total_files': self.total_files,
'processed_rows': self.processed_rows
})
# 生成报告
output_path = self.report_generator.generate_consolidated_report(
self.material_consolidator, base_stats
)
if not output_path:
return None
# 返回完整的统计信息
base_stats['output_path'] = output_path
return base_stats
class UserInterface:
"""用户界面处理器"""
@staticmethod
def select_folder(title: str = "选择文件夹") -> str:
"""选择文件夹"""
root = tk.Tk()
root.withdraw()
folder_path = filedialog.askdirectory(title=title)
root.destroy()
return folder_path
@staticmethod
def print_summary(stats: Dict[str, Any], folder_path: str):
"""打印汇总信息"""
print("\n" + "=" * 60)
print("BOM合并检查完成!")
print("=" * 60)
print(f"处理文件夹: {folder_path}")
print(f"扫描文件数: {stats['total_files']}")
print(f"成功处理文件数: {stats['processed_files']}")
print(f"处理数据行数: {stats['processed_rows']}")
print(f"合并物料种类数: {stats['total_materials']}")
print(f"存在问题的物料数: {stats['materials_with_issues']}")
print(f"不一致问题总数: {stats['total_inconsistencies']}")
# 显示数据源文件信息
files_info = stats.get('processed_files_info', [])
print(f"有效数据源文件数: {len(files_info)}")
for file_info in files_info:
print(f" - {file_info.filename} (Sheet: {file_info.sheet_name}, 有效行: {file_info.valid_rows})")
print(f"报告文件: {stats['output_path']}")
print("=" * 60)
# 额外显示输出文件夹信息
output_dir = os.path.join(folder_path, "BOM_Merge_out")
print(f"输出保存在: {output_dir}")
print("\n报告包含三个Sheet:")
print("1. '汇总信息' - 处理统计和汇总信息")
print("2. '数据源文件' - 有效数据源文件详细信息")
print("3. 'BOM_Merge' - 合并后的物料数据")
def main():
"""主函数"""
# 初始化处理器
bom_processor = BOMProcessor()
# 选择文件夹
folder_path = UserInterface.select_folder("选择包含BOM文件的文件夹")
if not folder_path:
print("未选择文件夹,程序退出")
return
bom_processor.set_output_folder(folder_path)
# 处理文件
print(f"开始处理文件夹: {folder_path}")
success = bom_processor.process_folder(folder_path)
if not success:
print("没有找到可处理的BOM文件")
return
# 生成报告
print("\n生成合并报告...")
stats = bom_processor.generate_report()
if stats:
UserInterface.print_summary(stats, folder_path)
else:
print("生成报告失败")
if __name__ == "__main__":
main()
input("\n按任意键退出...")

14
BOMCompare/README.md Normal file
View File

@@ -0,0 +1,14 @@
# Sample GitLab Project
This sample project shows how a project in GitLab looks for demonstration purposes. It contains issues, merge requests and Markdown files in many branches,
named and filled with lorem ipsum.
You can look around to get an idea how to structure your project and, when done, you can safely delete this project.
[Learn more about creating GitLab projects.](https://docs.gitlab.com/ee/gitlab-basics/create-project.html)
# 基于标准格式的 BOM文件输出 BOM差异信息文件
BOMCompereForJP.py
# 基于标准格式的 BOM文件输出 BOM的合并后的文件方便校对和物料备料情况的分析。
BOMConsolidator.py