Files
PythonApp/BOMCompare/BOMCompare for Merge V1.py

656 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import tkinter as tk
from tkinter import filedialog
from datetime import datetime
import os
from typing import Dict, List, Tuple, Optional
class BOMComparator:
"""BOM文件差异对比器"""
def __init__(self):
self.file1_path = ""
self.file2_path = ""
self.file1_sheets = []
self.file2_sheets = []
self.common_sheets = []
self.differences = {}
self.file1_name = ""
self.file2_name = ""
self.columns_to_exclude = ['检查信息', '检查状态', '校验信息'] # 要排除的列名
def select_file(self, title: str) -> str:
"""手动选择文件"""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title=title,
filetypes=[("Excel files", "*.xlsx"), ("All files", "*.*")]
)
root.destroy()
return file_path
def find_valid_sheets(self, file_path: str) -> List[str]:
"""参考附件3的方式查找有效的sheet"""
valid_sheets = []
try:
xl_file = pd.ExcelFile(file_path)
for sheet_name in xl_file.sheet_names:
try:
# 尝试读取sheet检查是否包含BOM数据
df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=10)
# 检查是否包含BOM相关列参考附件结构
required_columns = ['Partnumber', 'Purchase_Code', 'MF_PN', 'Description']
found_columns = [col for col in df.columns if col in required_columns]
if len(found_columns) >= 2: # 至少找到2个关键列
# 检查是否有实际数据(不只是表头)
if len(df) > 1:
valid_sheets.append(sheet_name)
except Exception as e:
continue
except Exception as e:
print(f"读取文件 {file_path} 时出错: {e}")
return valid_sheets
def get_common_sheets(self) -> List[str]:
"""获取两个文件的共同工作表"""
if not self.file1_sheets or not self.file2_sheets:
return []
# 标准化工作表名称(去除空格和特殊字符)
file1_clean = [self.standardize_sheet_name(sheet) for sheet in self.file1_sheets]
file2_clean = [self.standardize_sheet_name(sheet) for sheet in self.file2_sheets]
# 找出共同的工作表
common_sheets = []
for sheet1 in self.file1_sheets:
clean_sheet1 = self.standardize_sheet_name(sheet1)
for sheet2 in self.file2_sheets:
clean_sheet2 = self.standardize_sheet_name(sheet2)
if clean_sheet1 == clean_sheet2:
common_sheets.append(sheet1)
break
return common_sheets
def standardize_sheet_name(self, sheet_name: str) -> str:
"""标准化工作表名称,便于比较"""
return str(sheet_name).strip().lower().replace(' ', '_').replace('-', '_')
def load_bom_data(self, file_path: str, sheet_name: str) -> pd.DataFrame:
"""加载BOM数据"""
try:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 清理数据:去除空行和空列
df = df.dropna(how='all').dropna(axis=1, how='all')
# 清理列名
df.columns = df.columns.str.strip()
return df
except Exception as e:
print(f"加载sheet {sheet_name} 时出错: {e}")
return pd.DataFrame()
def should_compare_column(self, column_name: str) -> bool:
"""判断是否应该对比该列(排除检查信息类列)"""
exclude_keywords = ['检查', '校验', '状态', '备注', 'comment', 'check']
column_lower = str(column_name).lower()
# 检查是否在排除列表中
if column_name in self.columns_to_exclude:
return False
# 检查是否包含排除关键词
for keyword in exclude_keywords:
if keyword in column_lower:
return False
return True
def get_columns_to_compare(self, df1: pd.DataFrame, df2: pd.DataFrame) -> List[str]:
"""获取需要对比的列名(排除检查信息类列)"""
common_columns = list(set(df1.columns).intersection(set(df2.columns)))
# 过滤掉不需要对比的列
columns_to_compare = [col for col in common_columns if self.should_compare_column(col)]
return columns_to_compare
def compare_dataframes(self, df1: pd.DataFrame, df2: pd.DataFrame, sheet_name1: str, sheet_name2: str) -> Dict:
"""对比两个DataFrame的差异排除检查信息类列"""
differences = {
'sheet_names': f"{sheet_name1} vs {sheet_name2}",
'added_rows': [],
'removed_rows': [],
'modified_rows': [],
'columns_comparison': {},
'summary': {
'total_rows_df1': len(df1),
'total_rows_df2': len(df2),
'added_count': 0,
'removed_count': 0,
'modified_count': 0
},
'original_dfs': {
'df1': df1.copy(),
'df2': df2.copy()
}
}
# 确定关键列用于行匹配
key_columns = self.identify_key_columns(df1, df2)
if not key_columns:
differences['error'] = "无法确定用于对比的关键列"
return differences
try:
# 设置索引
df1_indexed = df1.set_index(key_columns)
df2_indexed = df2.set_index(key_columns)
# 获取需要对比的列(排除检查信息类列)
columns_to_compare = self.get_columns_to_compare(df1, df2)
# 找出新增的行
new_indexes = df2_indexed.index.difference(df1_indexed.index)
if len(new_indexes) > 0:
differences['added_rows'] = df2_indexed.loc[new_indexes].reset_index().to_dict('records')
differences['summary']['added_count'] = len(new_indexes)
# 找出删除的行
removed_indexes = df1_indexed.index.difference(df2_indexed.index)
if len(removed_indexes) > 0:
differences['removed_rows'] = df1_indexed.loc[removed_indexes].reset_index().to_dict('records')
differences['summary']['removed_count'] = len(removed_indexes)
# 找出共同的行并进行详细对比(排除检查信息类列)
common_indexes = df1_indexed.index.intersection(df2_indexed.index)
for idx in common_indexes:
row1 = df1_indexed.loc[idx]
row2 = df2_indexed.loc[idx]
# 检查每列的值是否相同(只对比需要比较的列)
modified_cols = {}
for col in columns_to_compare:
if col in df1_indexed.columns and col in df2_indexed.columns:
val1 = row1[col]
val2 = row2[col]
# 处理NaN值的比较
if pd.isna(val1) and pd.isna(val2):
continue
elif pd.isna(val1) or pd.isna(val2) or str(val1) != str(val2):
modified_cols[col] = {
'old_value': val1,
'new_value': val2
}
if modified_cols:
# 获取完整的行数据以显示所有需要的列
full_row_data = self.get_full_row_data_for_display(df1, df2, idx, key_columns)
differences['modified_rows'].append({
'key_values': dict(zip(key_columns, idx)) if isinstance(idx, tuple) else {key_columns[0]: idx},
'modified_columns': modified_cols,
'full_row_data': full_row_data
})
differences['summary']['modified_count'] += 1
# 列级对比(包含所有列,用于统计)
common_columns = set(df1.columns).intersection(set(df2.columns))
df1_only_columns = set(df1.columns).difference(set(df2.columns))
df2_only_columns = set(df2.columns).difference(set(df1.columns))
# 计算实际参与对比的列
compared_columns = set(columns_to_compare)
excluded_columns = common_columns - compared_columns
differences['columns_comparison'] = {
'common_columns': list(common_columns),
'compared_columns': list(compared_columns),
'excluded_columns': list(excluded_columns),
'file1_only_columns': list(df1_only_columns),
'file2_only_columns': list(df2_only_columns)
}
except Exception as e:
differences['error'] = f"对比过程中出错: {str(e)}"
return differences
def get_full_row_data_for_display(self, df1: pd.DataFrame, df2: pd.DataFrame, idx, key_columns: List[str]) -> Dict:
"""获取完整的行数据用于显示"""
display_data = {}
# 获取两个文件中的对应行数据
row1_data = self.extract_row_data(df1, idx, key_columns)
row2_data = self.extract_row_data(df2, idx, key_columns)
# 定义需要显示的列(排除检查信息类列)
display_columns = ['Purchase_Code', 'MF_PN', 'Description', 'Part Type', 'MF_NAME', 'PCB_Footprint', '合计']
# 过滤掉检查信息类列
display_columns = [col for col in display_columns if self.should_compare_column(col)]
for col in display_columns:
val1 = row1_data.get(col, '')
val2 = row2_data.get(col, '')
# 格式化显示:有差异显示原值->新值,无差异显示原值
if pd.isna(val1) or val1 == '':
display_value = val2
elif pd.isna(val2) or val2 == '':
display_value = val1
elif str(val1) != str(val2):
display_value = f"{val1} -> {val2}"
else:
display_value = val1
display_data[col] = display_value
# 添加文件来源信息
display_data['_from_file1'] = row1_data
display_data['_from_file2'] = row2_data
return display_data
def extract_row_data(self, df: pd.DataFrame, idx, key_columns: List[str]) -> Dict:
"""从DataFrame中提取指定行的数据"""
row_data = {}
try:
if isinstance(idx, tuple):
# 多列索引的情况
mask = pd.Series(True, index=df.index)
for i, key in enumerate(key_columns):
mask = mask & (df[key] == idx[i])
if mask.any():
original_row = df[mask].iloc[0]
for col in df.columns:
row_data[col] = original_row[col]
else:
# 单列索引的情况
matching_rows = df[df[key_columns[0]] == idx]
if len(matching_rows) > 0:
original_row = matching_rows.iloc[0]
for col in df.columns:
row_data[col] = original_row[col]
except Exception as e:
pass
return row_data
def format_value_display(self, value1, value2):
"""格式化值的显示:有差异显示原值->新值,无差异显示原值"""
if pd.isna(value1) or value1 == '':
return value2
elif pd.isna(value2) or value2 == '':
return value1
elif str(value1) != str(value2):
return f"{value1} -> {value2}"
else:
return value1
def get_modified_columns_summary(self, modified_columns: Dict) -> str:
"""获取修改列的概要汇总"""
if not modified_columns:
return "无修改"
modified_list = list(modified_columns.keys())
# 如果修改列数量较少,直接显示
if len(modified_list) <= 3:
return ", ".join(modified_list)
else:
# 数量较多时显示前3个加省略号
return ", ".join(modified_list[:3]) + f"...等{len(modified_list)}"
def identify_key_columns(self, df1: pd.DataFrame, df2: pd.DataFrame) -> List[str]:
"""识别用于行匹配的关键列"""
# 优先使用Partnumber作为关键列
potential_keys = ['Partnumber', 'Purchase_Code', 'MF_PN']
for key in potential_keys:
if key in df1.columns and key in df2.columns:
# 检查该列是否适合作为关键列(不应有过多重复值)
df1_dup_rate = df1[key].duplicated().sum() / len(df1)
df2_dup_rate = df2[key].duplicated().sum() / len(df2)
if df1_dup_rate < 0.1 and df2_dup_rate < 0.1: # 允许少量重复
return [key]
# 如果没有单一关键列,尝试组合
for key_combo in [['Partnumber', 'MF_PN'], ['Purchase_Code', 'MF_PN']]:
if all(col in df1.columns for col in key_combo) and all(col in df2.columns for col in key_combo):
return key_combo
# 最后尝试使用所有找到的共同列
common_cols = list(set(df1.columns).intersection(set(df2.columns)))
if common_cols:
return common_cols[:2] # 最多使用前两列
return []
def generate_output_filename(self) -> str:
"""生成输出文件名以两个文件的有效sheet名称开头"""
if not self.file1_sheets or not self.file2_sheets:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"BOM差异报告_{timestamp}.xlsx"
# 使用第一个文件第一个sheet和第二个文件第一个sheet
file1_sheet_name = str(self.file1_sheets[0]) if self.file1_sheets else "File1"
file2_sheet_name = str(self.file2_sheets[0]) if self.file2_sheets else "File2"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 清理sheet名称中的特殊字符
clean_sheet1 = self.clean_filename(file1_sheet_name)
clean_sheet2 = self.clean_filename(file2_sheet_name)
filename = f"{clean_sheet1}_vs_{clean_sheet2}_差异报告_{timestamp}.xlsx"
return filename
def clean_filename(self, filename: str) -> str:
"""清理文件名中的特殊字符"""
filename = str(filename)
# 移除Windows文件名中不允许的字符
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# 移除多余的空格和特殊字符
filename = filename.replace(' ', '_')
filename = filename.replace('\t', '_')
filename = filename.replace('\n', '_')
# 限制文件名长度
if len(filename) > 50:
filename = filename[:50]
return filename
def clean_sheet_name(self, sheet_name: str, max_length: int = 25) -> str:
"""清理工作表名称确保符合Excel工作表名称限制"""
sheet_name = str(sheet_name)
# 移除Excel工作表名称中不允许的字符
invalid_chars = '[]:*?/\\'
for char in invalid_chars:
sheet_name = sheet_name.replace(char, '_')
# 限制工作表名称长度Excel限制为31个字符
if len(sheet_name) > max_length:
sheet_name = sheet_name[:max_length]
return sheet_name
def get_output_directory(self) -> str:
"""获取输出目录(第二个文件所在目录)"""
return os.path.dirname(self.file2_path)
def generate_difference_report(self) -> str:
"""生成差异报告Excel文件"""
if not self.differences:
return "没有发现差异"
# 生成输出文件名和路径
output_filename = self.generate_output_filename()
output_directory = self.get_output_directory()
output_path = os.path.join(output_directory, output_filename)
try:
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# 创建总摘要表
summary_data = []
for diff_key, differences in self.differences.items():
if 'error' not in differences:
columns_comparison = differences.get('columns_comparison', {})
excluded_count = len(columns_comparison.get('excluded_columns', []))
summary_data.append([
differences.get('sheet_names', diff_key),
differences['summary']['total_rows_df1'],
differences['summary']['total_rows_df2'],
differences['summary']['added_count'],
differences['summary']['removed_count'],
differences['summary']['modified_count'],
excluded_count
])
if summary_data:
summary_df = pd.DataFrame(summary_data, columns=[
'工作表对比', '文件1行数', '文件2行数', '新增行数', '删除行数', '修改行数', '排除列数'
])
summary_df.to_excel(writer, sheet_name='对比摘要', index=False)
# 为每个对比创建详细报告
for diff_key, differences in self.differences.items():
sheet_key = self.clean_sheet_name(diff_key.replace('vs', '_vs_'))
if 'error' in differences:
# 如果有错误,创建错误报告
error_df = pd.DataFrame([['错误信息', differences['error']]])
error_df.to_excel(writer, sheet_name=f"{sheet_key}_错误", index=False, header=False)
continue
# 汇总表 - 包含列对比的详细信息
summary_data = []
summary_data.append(["对比项", "数量"])
summary_data.append(["文件1总行数", differences['summary']['total_rows_df1']])
summary_data.append(["文件2总行数", differences['summary']['total_rows_df2']])
summary_data.append(["新增行数", differences['summary']['added_count']])
summary_data.append(["删除行数", differences['summary']['removed_count']])
summary_data.append(["修改行数", differences['summary']['modified_count']])
summary_data.append(["共同列数", len(differences['columns_comparison']['common_columns'])])
summary_data.append(["实际对比列数", len(differences['columns_comparison']['compared_columns'])])
summary_data.append(["排除列数", len(differences['columns_comparison']['excluded_columns'])])
summary_data.append(["文件1特有列", len(differences['columns_comparison']['file1_only_columns'])])
summary_data.append(["文件2特有列", len(differences['columns_comparison']['file2_only_columns'])])
# 添加排除列详情
excluded_cols = differences['columns_comparison'].get('excluded_columns', [])
if excluded_cols:
summary_data.append(["", ""])
summary_data.append(["排除的列", "(检查信息类列不参与对比)"])
for col in excluded_cols:
summary_data.append(["", f"- {col}"])
pd.DataFrame(summary_data).to_excel(
writer,
sheet_name=f"{sheet_key}_汇总",
index=False,
header=False
)
# 新增行详情
if differences['added_rows']:
pd.DataFrame(differences['added_rows']).to_excel(
writer,
sheet_name=f"{sheet_key}_新增行",
index=False
)
# 删除行详情
if differences['removed_rows']:
pd.DataFrame(differences['removed_rows']).to_excel(
writer,
sheet_name=f"{sheet_key}_删除行",
index=False
)
# 修改行详情 - 优化后的显示格式(排除检查信息列)
if differences['modified_rows']:
modified_data = []
for mod_row in differences['modified_rows']:
# 创建基础记录
record = {
**mod_row['key_values'], # 关键列如Partnumber
'修改列': self.get_modified_columns_summary(mod_row['modified_columns'])
}
# 添加所有需要显示的列(排除检查信息类列)
display_data = mod_row.get('full_row_data', {})
# 获取需要显示的列
display_columns = list(display_data.keys())
display_columns = [col for col in display_columns if
not col.startswith('_') and self.should_compare_column(col)]
for col in display_columns:
record[col] = display_data.get(col, '')
# 添加详细的修改信息(只包括参与对比的列)
for col, values in mod_row['modified_columns'].items():
if self.should_compare_column(col):
record[f'详细_{col}'] = f"{values['old_value']} -> {values['new_value']}"
modified_data.append(record)
if modified_data:
modified_df = pd.DataFrame(modified_data)
# 重新排列列的顺序,让重要信息在前
column_order = list(mod_row['key_values'].keys()) + ['修改列']
# 添加其他显示列
other_columns = [col for col in modified_df.columns
if col not in column_order and not col.startswith('详细_')]
column_order.extend(other_columns)
# 添加详细修改信息列
detailed_cols = [col for col in modified_df.columns if col.startswith('详细_')]
column_order.extend(detailed_cols)
# 确保所有列都存在
existing_columns = [col for col in column_order if col in modified_df.columns]
modified_df = modified_df[existing_columns]
modified_df.to_excel(
writer,
sheet_name=f"{sheet_key}_修改行",
index=False
)
return output_path
except Exception as e:
print(f"生成报告时出错: {e}")
return ""
def run_comparison(self):
"""执行完整的BOM对比流程"""
print("=== BOM文件差异对比工具 ===")
print("注意:检查信息类列(如'检查信息')将不参与修改行对比")
# 1. 选择第一份文件
print("\n步骤1: 选择第一份Excel文件")
self.file1_path = self.select_file("选择第一份BOM Excel文件")
if not self.file1_path:
print("未选择文件,程序退出")
return
self.file1_name = os.path.basename(self.file1_path)
# 2. 选择第二份文件
print("\n步骤2: 选择第二份Excel文件")
self.file2_path = self.select_file("选择第二份BOM Excel文件")
if not self.file2_path:
print("未选择文件,程序退出")
return
self.file2_name = os.path.basename(self.file2_path)
print(f"\n文件1: {self.file1_name}")
print(f"文件2: {self.file2_name}")
# 3. 查找有效sheet
print("\n步骤3: 查找有效的工作表...")
self.file1_sheets = self.find_valid_sheets(self.file1_path)
self.file2_sheets = self.find_valid_sheets(self.file2_path)
print(f"文件1的有效工作表: {self.file1_sheets}")
print(f"文件2的有效工作表: {self.file2_sheets}")
if not self.file1_sheets or not self.file2_sheets:
print("至少有一个文件没有有效的工作表,无法进行对比")
return
# 4. 进行差异对比
print("\n步骤4: 进行差异对比...")
self.differences = {}
# 使用第一个文件第一个sheet和第二个文件第一个sheet进行对比
sheet1 = self.file1_sheets[0]
sheet2 = self.file2_sheets[0]
print(f"正在对比: {sheet1} (文件1) vs {sheet2} (文件2)")
df1 = self.load_bom_data(self.file1_path, sheet1)
df2 = self.load_bom_data(self.file2_path, sheet2)
if df1.empty:
print(f" ⚠ 文件1的工作表 {sheet1} 数据加载失败")
return
if df2.empty:
print(f" ⚠ 文件2的工作表 {sheet2} 数据加载失败")
return
differences = self.compare_dataframes(df1, df2, sheet1, sheet2)
comparison_key = f"{sheet1}_vs_{sheet2}"
self.differences[comparison_key] = differences
if 'error' in differences:
print(f" ⚠ 对比过程中出错: {differences['error']}")
else:
columns_comparison = differences.get('columns_comparison', {})
excluded_count = len(columns_comparison.get('excluded_columns', []))
print(f" √ 完成对比:")
print(f" 文件1行数: {differences['summary']['total_rows_df1']}")
print(f" 文件2行数: {differences['summary']['total_rows_df2']}")
print(f" 新增行数: {differences['summary']['added_count']}")
print(f" 删除行数: {differences['summary']['removed_count']}")
print(f" 修改行数: {differences['summary']['modified_count']}")
print(f" 排除列数: {excluded_count} (检查信息类列不参与对比)")
# 5. 生成差异报告
print("\n步骤5: 生成差异报告...")
output_file = self.generate_difference_report()
if output_file and os.path.exists(output_file):
print(f"\n=== 对比完成 ===")
print(f"差异报告已生成: {os.path.basename(output_file)}")
# print(f"文件位置: {output_file}")
print(f"输出目录: {self.get_output_directory()}")
else:
print("未成功生成差异报告")
def main():
"""主函数"""
comparator = BOMComparator()
comparator.run_comparison()
input("\n按Enter键退出...")
if __name__ == "__main__":
main()