Files
PythonApp/htmlProcess/htmlReportProcess_cmd_p/htmlReportProcess_cmd_pV1.py

1583 lines
65 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
from bs4 import BeautifulSoup
import os
import re
import sys
from datetime import datetime
import pytz # 需要安装 pytz 库
from colorama import Fore, Style, init
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import threading
from collections import defaultdict
import logging
init(autoreset=True)
class LogManager:
"""日志管理器"""
def __init__(self, output_dir):
self.output_dir = output_dir
self.log_file = os.path.join(output_dir, "processing_log.txt")
self.setup_logging()
def setup_logging(self):
"""设置日志记录"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def log_info(self, message):
"""记录信息日志"""
self.logger.info(message)
def log_warning(self, message):
"""记录警告日志"""
self.logger.warning(message)
def log_error(self, message):
"""记录错误日志"""
self.logger.error(message)
def log_statistics(self, statistics_data):
"""记录统计信息到日志文件"""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("处理统计汇总\n")
f.write("=" * 80 + "\n")
# 总体统计
f.write("\n=== 总体统计 ===\n")
f.write(f"处理时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"总SN数量: {statistics_data.get('total_sn_count', 0)}\n")
f.write(f"总处理文件数: {statistics_data.get('total_files_processed', 0)}\n")
f.write(f"成功处理文件数: {statistics_data.get('success_files', 0)}\n")
f.write(f"失败处理文件数: {statistics_data.get('failed_files', 0)}\n")
f.write(f"生成报告数: {statistics_data.get('generated_reports', 0)}\n")
f.write(f"失败报告数: {statistics_data.get('failed_reports', 0)}\n")
# Cell统计优化后的格式
if statistics_data.get('cell_statistics'):
f.write("\n=== Cell统计 ===\n")
cell_stats = statistics_data['cell_statistics']
f.write(f"涉及Cell总数: {len(cell_stats)}\n")
f.write(f"{"="*30}\n")
# 按Cell编号排序
sorted_cells = sorted(cell_stats.keys(), key=lambda x: int(x) if x.isdigit() else x)
for cell in sorted_cells:
stats = cell_stats[cell]
file_count = stats.get('file_count', 0)
sn_set = stats.get('sn_set', set())
fail_count = stats.get('fail_count', 0)
fail_details = stats.get('fail_details', {})
# SN列表逗号分隔
sn_list = ','.join(sorted(sn_set))
# 失败项详情
fail_items = []
if fail_details:
# 按失败次数排序取前5个
sorted_fail_items = sorted(fail_details.items(), key=lambda x: x[1], reverse=True)[:10]
for test_name, count in sorted_fail_items:
fail_items.append(f"{test_name}({count}次)")
if len (sorted_fail_items)>10:
lastitems = sorted(fail_details.items(), key=lambda x: x[1], reverse=True)[10:]
messageappend =''
for test_name, count in lastitems:
messageappend +=f"{test_name}({count}次)|"
fail_items.append(f"{messageappend})")
# 写入Cell统计信息
f.write(f" Cell {cell}: 文件数={file_count}, SN{sn_list}, FAIL数={fail_count}\n")
# 如果有失败项,写入失败详情
if fail_items:
f.write(f" FAIL项:\n ->{'\n ->'.join(fail_items)}\n")
# SN统计优化后的格式
if statistics_data.get('sn_statistics'):
f.write("\n=== SN统计 ===\n")
f.write(f"{"=" * 30}\n")
sn_stats = statistics_data['sn_statistics']
# 按SN排序按字母顺序
sorted_sns = sorted(sn_stats.keys())
for sn in sorted_sns:
stats = sn_stats[sn]
file_count = stats.get('file_count', 0)
cells = stats.get('cells', [])
fail_count = stats.get('fail_count', 0)
fail_details = stats.get('fail_details', {})
# Cell列表逗号分隔
cell_list = ','.join(sorted(cells, key=lambda x: int(x) if x.isdigit() else x))
# 失败项详情
fail_items = []
if fail_details:
# 按失败次数排序取前5个
sorted_fail_items = sorted(fail_details.items(), key=lambda x: x[1], reverse=True)[:10]
for test_name, count in sorted_fail_items:
fail_items.append(f"{test_name}({count}次)")
if len (sorted_fail_items)>10:
lastitems = sorted(fail_details.items(), key=lambda x: x[1], reverse=True)[10:]
messageappend =''
for test_name, count in lastitems:
messageappend +=f"{test_name}({count}次)|"
fail_items.append(f"{messageappend})")
# 写入SN统计信息
f.write(f" SN {sn}: 文件数={file_count}, Cells={','.join(cells)}, FAIL数={fail_count}\n")
# 如果有失败项,写入失败详情
if fail_items:
f.write(f" FAIL项:\n ->{'\n ->'.join(fail_items)}\n")
# 失败项统计
if statistics_data.get('failure_details'):
f.write("\n=== 失败项统计 ===\n")
failure_details = statistics_data['failure_details']
f.write(f"文件处理失败: {len(failure_details.get('file_failures', []))}\n")
f.write(f"报告生成失败: {len(failure_details.get('report_failures', []))}\n")
if failure_details.get('file_failures'):
f.write("\n文件处理失败详情:\n")
for failure in failure_details['file_failures']:
f.write(f" {failure}\n")
if failure_details.get('report_failures'):
f.write("\n报告生成失败详情:\n")
for failure in failure_details['report_failures']:
f.write(f" {failure}\n")
# 测试拔高数量统计
if statistics_data.get('test_elevation_stats'):
f.write("\n=== 测试拔高数量统计 ===\n")
elevation_stats = statistics_data['test_elevation_stats']
f.write(f"总测试拔高数量: {elevation_stats.get('total_elevations', 0)}\n")
f.write(f"平均每文件拔高数: {elevation_stats.get('avg_elevation_per_file', 0):.2f}\n")
if elevation_stats.get('elevation_by_cell'):
f.write("\n各Cell测试拔高数量:\n")
for cell, count in elevation_stats['elevation_by_cell'].items():
f.write(f" Cell {cell}: {count}\n")
f.write("\n" + "=" * 80 + "\n")
class ThreadSafeProgressTracker:
"""线程安全的多进程进度跟踪器"""
def __init__(self, total_files, log_manager):
self.lock = threading.Lock()
self.processed = 0
self.total = total_files
self.start_time = datetime.now()
self.success_count = 0
self.fail_count = 0
self.log_manager = log_manager
def update(self, success=True, infor='', count=1, _display_progress = True):
"""线程安全地更新进度"""
with self.lock:
self.processed += count
if success:
self.success_count += count
else:
self.fail_count += count
# 每处理10个文件或进度有显著变化时更新显示
if self.processed % 10 == 0 or self.processed == self.total:
if _display_progress == True:
self._display_progress(infor)
def _display_progress(self, infor=''):
"""显示当前进度"""
time_used = datetime.now() - self.start_time
percent = self.processed / self.total * 100 if self.total > 0 else 0
# 计算处理速度
elapsed_seconds = time_used.total_seconds()
speed = self.processed / elapsed_seconds if elapsed_seconds > 0 else 0
progress_bar = f"[{'' * int(percent / 5)}{' ' * (20 - int(percent / 5))}]"
sys_info = [
f"进度: {self.processed}/{self.total}",
f"{percent:.1f}% {progress_bar}",
f"成功: {self.success_count}",
f"失败: {self.fail_count}",
f"速度: {speed:.1f} 文件/秒",
f"耗时: {self._format_timedelta(time_used)}",
f"Infor:{infor}"
]
print('\x1b[2K\r' + ' | '.join(sys_info), end='', flush=True)
def finish(self, phase_name="处理"):
"""完成进度跟踪"""
self._display_progress()
completion_time = (datetime.now() - self.start_time).total_seconds()
self.log_manager.log_info(f"{phase_name}完成! 总耗时: {completion_time:.1f}")
self.log_manager.log_info(f"成功: {self.success_count}, 失败: {self.fail_count}")
print(f"\n{Fore.GREEN}{phase_name}完成! 总耗时: {completion_time:.1f}")
print(f"{Fore.CYAN}成功: {self.success_count}, 失败: {self.fail_count}")
def _format_timedelta(self, delta):
"""格式化时间差"""
seconds = delta.total_seconds()
return f"{int(seconds // 3600):02}:{int((seconds % 3600) // 60):02}:{int(seconds % 60):02}"
class HTMLFileProcessor:
"""HTML文件处理器单文件处理"""
@staticmethod
def _clean_test_name(raw_name):
"""使用配置规则清洗测试名称"""
rules = [
(r'^Round\d+_\d+_', ''), # 移除Round前缀
(r'_loop\d+$', ''), # 移除loop后缀
(r'_Round\d+$', ''), # 如果还有其他模式
]
result = raw_name
for pattern, replacement in rules:
result = re.sub(pattern, replacement, result)
return result
@staticmethod
def _extract_test_cycle_time(filename):
"""从文件名中提取测试周期时间并转换为标准格式"""
try:
# 匹配类似 "2025-11-21 13-23-16" 的格式
time_match = re.search(r'\((\d{4}-\d{2}-\d{2}\s+\d{2}-\d{2}-\d{2})\)', filename)
if time_match:
time_str = time_match.group(1)
# 使用datetime进行精确解析和格式化
dt = datetime.strptime(time_str, "%Y-%m-%d %H-%M-%S")
formatted_time = dt.strftime("%Y-%m-%d %H:%M:%S")
return formatted_time
except Exception:
# 如果datetime解析失败使用字符串处理作为备选方案
try:
if time_match:
time_str = time_match.group(1)
# 分割日期和时间部分
date_part, time_part = time_str.split()
# 只替换时间部分的分隔符
formatted_time_part = time_part.replace('-', ':')
formatted_time = f"{date_part} {formatted_time_part}"
return formatted_time
except Exception:
pass
return "UNKNOWN_TIME"
@staticmethod
def _extract_sn_and_cell(soup, filename):
"""提取SN号和cell编号"""
try:
sn_regex = r'F[A-Z0-9]{15}(?:[A-Z0-9]{5,})?'
# 提取SN
sn = "UNKNOWN_SN"
if soup is not None:
sn_tag = soup.find('h3', string=re.compile(r'Serial Number:', re.I))
if sn_tag:
content_match = re.search(rf'\b({sn_regex})\b', sn_tag.get_text(), flags=re.I)
if content_match:
sn = content_match.group(1)
else:
html_text = soup.get_text(" ", strip=True)
content_match = re.search(rf'\b({sn_regex})\b', html_text, flags=re.I)
if content_match:
sn = content_match.group(1)
else:
content_match = re.search(rf'\b({sn_regex})\b', filename, flags=re.I)
if content_match:
sn = content_match.group(1)
# 提取cell编号从文件名末尾的"数字"
cell_match = re.search(r'-(\d+)\.html$', filename)
cell = cell_match.group(1) if cell_match else "UNKNOWN_CELL"
return sn, cell
except Exception as e:
print(f"{Fore.RED}⚠ SN/CELL提取失败: {filename} - {str(e)}")
return "ERROR_SN", "ERROR_CELL"
@staticmethod
def _find_status_index(headers):
"""识别状态列索引"""
if not headers:
return None
for idx, h in enumerate(headers):
h_norm = str(h).strip().lower()
if h_norm in ('status', 'result', 'test status') or re.search(r'status|result', h_norm, flags=re.I):
return idx
return None
@staticmethod
def _count_fail_rows(table, status_col_idx):
"""统计表格中的FAIL行数在添加新列之前统计"""
fail_count = 0
if status_col_idx is None:
return fail_count
all_trs = table.find_all('tr')
start_index = 2 if len(all_trs) >= 3 else 1
for row in all_trs[start_index:]:
cols = [td.get_text(strip=True) for td in row.find_all(['td', 'th'])]
if len(cols) > status_col_idx:
status_val = cols[status_col_idx].strip().upper()
if 'FAIL' in status_val:
fail_count += 1
return fail_count
@staticmethod
def process_single_file(file_path):
"""处理单个HTML文件独立函数便于并行化"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
soup = BeautifulSoup(html_content, 'html.parser')
filename = os.path.basename(file_path)
sn, cell = HTMLFileProcessor._extract_sn_and_cell(soup, filename)
test_cycle_time = HTMLFileProcessor._extract_test_cycle_time(filename)
# 确保SN不为None
if sn is None:
sn = "UNKNOWN_SN"
if cell is None:
cell = "UNKNOWN_CELL"
if test_cycle_time is None:
test_cycle_time = "UNKNOWN_TIME"
table = soup.find('table', border=1) or soup.find('table')
if not table:
return {"success": False, "error": "未找到有效数据表格", "file": filename}
# 首先识别状态列索引(在原始表头上)
header_tr = table.find('tr', bgcolor='#eeeeee')
if not header_tr:
for tr in table.find_all('tr'):
if tr.find('th'):
header_tr = tr
break
if not header_tr:
header_tr = table.find('tr')
original_headers = [th.get_text(strip=True) for th in header_tr.find_all(['th', 'td'])]
if len(original_headers) > 11:
original_headers = original_headers[:11]
status_col_idx = HTMLFileProcessor._find_status_index(original_headers)
# 先统计FAIL数量在原始表格数据上统计
file_fail_count = HTMLFileProcessor._count_fail_rows(table, status_col_idx)
# 处理表格数据
headers, rows = HTMLFileProcessor._process_table_data(
table, html_content, filename, sn, cell, test_cycle_time, status_col_idx)
return {
"success": True,
"sn": sn,
"cell": cell,
"test_cycle_time": test_cycle_time,
"filename": filename,
"headers": headers,
"rows": rows,
"fail_count": file_fail_count
}
except Exception as e:
return {
"success": False,
"error": f"{type(e).__name__}: {str(e)}",
"file": filename
}
@staticmethod
def _process_table_data(table, html_content, filename, sn, cell, test_cycle_time, status_col_idx):
"""处理表格数据"""
# 表头处理
header_tr = table.find('tr', bgcolor='#eeeeee')
if not header_tr:
for tr in table.find_all('tr'):
if tr.find('th'):
header_tr = tr
break
if not header_tr:
header_tr = table.find('tr')
headers = [th.get_text(strip=True) for th in header_tr.find_all(['th', 'td'])]
if len(headers) > 11:
headers = headers[:11]
# 插入新增列SN、Cell、TestCycleTime
headers.insert(0, 'SN')
headers.insert(1, 'Cell')
headers.insert(2, 'TestCycleTime')
# 原有的新增列
test_name_idx = next((i for i, h in enumerate(headers) if re.search(r'test\s*name', h, flags=re.I)), 3)
if test_name_idx < len(headers):
headers.insert(test_name_idx + 1, 'Test Name New')
headers.append('Test Time')
headers.append('Source File name')
# 提取起始时间
base_timestamp = HTMLFileProcessor._extract_base_timestamp(html_content, filename)
# print(f"base_timestamp {base_timestamp}")
global_elapsed_accumulator = base_timestamp if base_timestamp else 0.0
rows = []
# 处理数据行
all_trs = table.find_all('tr')
start_index = 2 if len(all_trs) >= 3 else 1
for row in all_trs[start_index:]:
cols = [td.get_text(strip=True) for td in row.find_all(['td', 'th'])]
if len(cols) < 2:
continue
original_test_name = cols[1].strip()
# 处理时间信息
elapsed_time_str = cols[9].strip() if len(cols) > 9 else "0"
elapsed_append , global_elapsed_accumulator = HTMLFileProcessor._calculate_timestamp(
cols, original_test_name, elapsed_time_str, global_elapsed_accumulator)
if elapsed_append is not None and base_timestamp is None:
global_elapsed_accumulator = elapsed_append - float(elapsed_time_str)
# 构建行数据:先插入新增的三个列
cols.insert(0, sn) # SN列
cols.insert(1, cell) # Cell列
cols.insert(2, test_cycle_time) # TestCycleTime列
# 原有的列处理
adjusted_test_name_idx = test_name_idx + 1 # 因为插入了3个新列
if adjusted_test_name_idx + 1 < len(cols):
cols.insert(adjusted_test_name_idx ,
HTMLFileProcessor._clean_test_name(cols[test_name_idx]))
# print(f"cols[test_name_idx]{cols[test_name_idx]}->{HTMLFileProcessor._clean_test_name(cols[test_name_idx])}")
cols.append(elapsed_append if elapsed_append is not None else 0.0)
cols.append(filename)
rows.append(cols)
return headers, rows
@staticmethod
def _extract_base_timestamp(html_content, filename):
"""提取基准时间戳"""
start_time_match = re.search(r"Start Time:\s*(.+?)(?:\s*<|$)", html_content, re.IGNORECASE)
if start_time_match:
start_time_str = re.sub(r'<[^>]+>', '', start_time_match.group(1)).strip()
try:
dt = datetime.strptime(start_time_str, "%A, %B %d, %Y %I:%M:%S %p")
dt = pytz.timezone('UTC').localize(dt) # 转换为北京、UTC时间
# print(f"基于 测试报告概述中的时间信息解析到的测试启动时间_extract_base_timestamp dt {dt} : {dt.timestamp()}")
return dt.timestamp()
except:
try:
dt = datetime.strptime(start_time_str.split(', ', 1)[1], "%B %d, %Y %I:%M:%S %p")
return dt.timestamp()
except:
pass
return None
@staticmethod
def _calculate_timestamp(cols, test_name, elapsed_time_str, base_accumulator):
"""计算时间戳"""
if test_name == "Test_Time" and len(cols) > 7:
measurement_str = cols[7]
try:
dt = datetime.strptime(measurement_str, "%m/%d/%Y %I:%M:%S %p")
dt = pytz.timezone('UTC').localize(dt) # 转换为北京、UTC时间
# print(f"基于报告中 Test_Time 字段更新基准时间, _calculate_timestamp {dt}{dt.timestamp()} \n base_accumulator:{base_accumulator}")
base_accumulator = dt.timestamp()
return base_accumulator,base_accumulator
except:
pass
try:
return base_accumulator + float(elapsed_time_str) ,base_accumulator
except ValueError:
return base_accumulator,base_accumulator
class ExcelReportWorker:
"""Excel报告生成工作器单个报告生成"""
@staticmethod
def generate_single_report(report_data, output_dir):
"""生成单个Excel报告"""
try:
sn = report_data["sn"]
cell = report_data.get("cell", "UNKNOWN_CELL")
all_cells = report_data.get("all_cells", "UNKNOWN_CELLlist")
test_cycle_time = report_data.get("test_cycle_time", "UNKNOWN_TIME")
data_info = report_data["data_info"]
source_files_count = report_data["source_files_count"]
# 安全处理SN、cell和test_cycle_time防止None值
if sn is None:
sn = "UNKNOWN_SN"
if cell is None:
cell = "UNKNOWN_CELL"
if all_cells is None:
all_cells = "UNKNOWN_CELLlist"
if test_cycle_time is None:
test_cycle_time = "UNKNOWN_TIME"
# 在文件名中体现cell编号
cell_list_display = ','.join(str(c) for c in all_cells)
# base_name = f"{sn}_C-{all_cells}_R-{source_files_count}"
base_name = f"{sn}_C-{cell_list_display}_R-{source_files_count}"
output_file = os.path.join(output_dir, f"{base_name}.xlsx")
# 检查数据是否有效
if not data_info or 'headers' not in data_info or 'data' not in data_info:
return {
"success": False,
"sn": sn,
"cell": cell,
"all_cells": all_cells,
"test_cycle_time": test_cycle_time,
"error": "数据格式无效或为空"
}
df_all = ExcelReportWorker._prepare_dataframe(data_info)
if df_all.empty:
return {
"success": False,
"sn": sn,
"cell": cell,
"all_cells": all_cells,
"test_cycle_time": test_cycle_time,
"error": "DataFrame为空无数据可生成"
}
# 识别状态列并统计失败使用文件处理时统计的FAIL数量
status_col = ExcelReportWorker._detect_status_column(df_all)
fail_count = data_info.get('report_stats', {}).get('fail_count', 0)
# 提取TestCycleTime相关统计信息
time_stats = ExcelReportWorker._extract_time_statistics(df_all, data_info)
if status_col and fail_count == 0:
# 如果文件处理时统计为0但在DataFrame中可能有FAIL进行双重检查
fail_mask = df_all[status_col].astype(str).str.strip().str.upper().str.contains('FAIL')
fail_count = int(fail_mask.sum())
df_fail = df_all[fail_mask]
elif fail_count > 0:
# 使用文件处理时统计的FAIL数量创建FAIL子集
if status_col:
fail_mask = df_all[status_col].astype(str).str.strip().str.upper().str.contains('FAIL')
df_fail = df_all[fail_mask]
else:
df_fail = pd.DataFrame(columns=df_all.columns)
else:
df_fail = pd.DataFrame(columns=df_all.columns)
# 根据失败数量调整文件名保留cell编号
if fail_count > 0:
new_name = f"{base_name}_Fitem-{fail_count}.xlsx"
output_file = os.path.join(output_dir, new_name)
# 报告统计包含cell编号和测试周期时间信息
report_stats = data_info.get('report_stats', {})
source_files_count = report_stats.get('source_files_count', 0)
cell_info = report_stats.get('cell_info', {})
# 保存Excel包含cell编号和测试周期时间信息
ExcelReportWorker._save_excel(df_all, df_fail, output_file, sn, cell, test_cycle_time,
source_files_count, fail_count, cell_info, time_stats)
return {
"success": True,
"sn": sn,
"cell": cell,
"all_cells": all_cells,
"test_cycle_time": test_cycle_time,
"output_file": output_file,
"record_count": len(df_all),
"source_files_count": source_files_count,
"fail_count": fail_count
}
except Exception as e:
return {
"success": False,
"sn": sn if sn is not None else "UNKNOWN_SN",
"cell": cell if cell is not None else "UNKNOWN_CELL",
"all_cells": all_cells if all_cells is not None else "UNKNOWN_CELL",
"test_cycle_time": test_cycle_time if test_cycle_time is not None else "UNKNOWN_TIME",
"error": f"{type(e).__name__}: {str(e)}"
}
@staticmethod
def _extract_time_statistics(df_all, data_info):
"""提取TestCycleTime相关统计信息"""
time_stats = {
'all_times': [],
'time_count': 0,
'time_distribution': {},
'records_by_time': {},
'time_range': {}
}
# 从数据中提取TestCycleTime信息
if 'TestCycleTime' in df_all.columns:
time_values = df_all['TestCycleTime'].dropna().unique()
time_stats['all_times'] = sorted(list(time_values))
time_stats['time_count'] = len(time_values)
# 每个时间段的记录数量
time_counts = df_all['TestCycleTime'].value_counts().to_dict()
time_stats['time_distribution'] = time_counts
# 每个时间段的记录详情
for time_val in time_values:
time_records = df_all[df_all['TestCycleTime'] == time_val]
time_stats['records_by_time'][time_val] = len(time_records)
# 时间范围(如果有时间信息)
try:
datetime_objects = []
for time_str in time_values:
if time_str != "UNKNOWN_TIME":
try:
dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
datetime_objects.append(dt)
except:
pass
if datetime_objects:
min_time = min(datetime_objects)
max_time = max(datetime_objects)
time_stats['time_range'] = {
'start': min_time.strftime("%Y-%m-%d %H:%M:%S"),
'end': max_time.strftime("%Y-%m-%d %H:%M:%S"),
'duration_hours': round((max_time - min_time).total_seconds() / 3600, 2)
}
except:
pass
# 从report_stats中获取补充信息
report_stats = data_info.get('report_stats', {})
time_info = report_stats.get('time_info', {})
if time_info.get('all_times'):
# 合并两个来源的时间信息
existing_times = set(time_stats['all_times'])
new_times = set(time_info.get('all_times', []))
all_combined_times = sorted(list(existing_times.union(new_times)))
time_stats['all_times'] = all_combined_times
time_stats['time_count'] = len(all_combined_times)
return time_stats
@staticmethod
def _detect_status_column(df):
"""检测状态列"""
for col in df.columns:
col_str = str(col)
if re.search(r'\b(status|result)\b', col_str, flags=re.I) or col_str.strip().lower() in (
'status', 'result', 'test status'):
return col
return None
@staticmethod
def _save_excel(df_all, df_fail, output_file, sn, cell, test_cycle_time, source_files_count, fail_count,
cell_info=None, time_stats=None):
try:
# 预处理:对对象列的超长字符串裁剪、填充 NaN
def _sanitize_df(df):
df = df.copy()
obj_cols = df.select_dtypes(include=['object']).columns
# 裁剪到 Excel 单元格上限 32767
for c in obj_cols:
df[c] = df[c].astype(str).str.slice(0, 32767)
# 可选:填充 NaN 以避免"空看起来像缺数据"
df[obj_cols] = df[obj_cols].fillna('')
return df
df_all = _sanitize_df(df_all)
df_fail = _sanitize_df(df_fail) if df_fail is not None and not df_fail.empty else df_fail
with pd.ExcelWriter(
output_file,
engine='xlsxwriter',
engine_kwargs={'options': {
'strings_to_urls': False,
'strings_to_formulas': False
}}
) as writer:
# Report Stats扩展统计信息包含TestCycleTime详细信息
stats_data = ExcelReportWorker._prepare_stats_data(
sn, cell, test_cycle_time, source_files_count, fail_count,
cell_info, time_stats
)
pd.DataFrame(stats_data).to_excel(writer, sheet_name='Report Stats', index=False)
# All Tests分片写入避免超过 Excel 行数)
MAX_ROWS = 1_048_576
CHUNK = 200_000 # 可根据机器调整
total_rows = len(df_all)
if total_rows == 0:
# 空表也创建一个空 Sheet避免看起来"不完整"
pd.DataFrame(columns=df_all.columns).to_excel(writer, sheet_name='All Tests', index=False)
elif total_rows <= MAX_ROWS - 1: # -1 给表头预留一行
df_all.to_excel(writer, sheet_name='All Tests', index=False)
else:
# 超过行数限制,拆分到多个 Sheet
for start in range(0, total_rows, MAX_ROWS - 1):
end = min(start + (MAX_ROWS - 1), total_rows)
sheet_name = f'All Tests_{start // (MAX_ROWS - 1) + 1}'
df_all.iloc[start:end].to_excel(writer, sheet_name=sheet_name, index=False)
# FAIL list 仅在有数据时写,避免空表开销和误判
if fail_count > 0 and df_fail is not None and not df_fail.empty:
df_fail.to_excel(writer, sheet_name='FAIL list', index=False)
# TestCycleTime Details新增测试周期时间详情表
if time_stats and time_stats['all_times']:
ExcelReportWorker._create_time_details_sheet(writer, time_stats)
# 列宽仅在小表上设置;避免对大表(多达几十万行)做格式化
wb = writer.book
ws = writer.sheets.get('Report Stats')
if ws is not None:
ws.set_column(0, 0, 25) # A列加宽以容纳更多统计项
ws.set_column(1, 1, 40) # B列加宽以容纳更长的时间列表
except Exception as e:
raise RuntimeError(f"Excel文件保存失败: {str(e)}")
@staticmethod
def _prepare_stats_data(sn, cell, test_cycle_time, source_files_count, fail_count,
cell_info, time_stats):
"""准备统计页面数据扩展包含TestCycleTime详细信息"""
stats_data = {
'统计项': [],
'': []
}
# 基础统计信息
base_stats = {
'SN号': sn,
'Cell编号': cell,
'主要测试周期时间': test_cycle_time,
'来源HTML文件数': source_files_count,
'总FAIL数量': fail_count,
'生成时间': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
for key, value in base_stats.items():
stats_data['统计项'].append(key)
stats_data[''].append(value)
# TestCycleTime详细信息
if time_stats:
# 测试周期时间数量
stats_data['统计项'].append('测试周期时间总数')
stats_data[''].append(time_stats.get('time_count', 0))
# 测试周期时间列表前10个
all_times = time_stats.get('all_times', [])
time_list_display = ', '.join(str(t) for t in all_times[:10])
if len(all_times) > 10:
time_list_display += f'...等{len(all_times)}个时间段'
stats_data['统计项'].append('测试周期时间列表')
stats_data[''].append(time_list_display)
# 时间范围信息
time_range = time_stats.get('time_range', {})
if time_range:
stats_data['统计项'].append('测试时间范围')
stats_data[''].append(f"{time_range.get('start', '')}{time_range.get('end', '')}")
stats_data['统计项'].append('测试持续时长(小时)')
stats_data[''].append(time_range.get('duration_hours', 0))
# 按时间分布统计
time_distribution = time_stats.get('time_distribution', {})
if time_distribution:
top_times = sorted(time_distribution.items(), key=lambda x: x[1], reverse=True)[:5]
dist_display = ', '.join([f"{time}({count})" for time, count in top_times])
if len(time_distribution) > 5:
dist_display += f'...等{len(time_distribution)}个分布'
stats_data['统计项'].append('记录数时间分布(前5)')
stats_data[''].append(dist_display)
# Cell详细信息
if cell_info:
all_cells = cell_info.get('all_cells', [])
cell_count = cell_info.get('cell_count', 0)
stats_data['统计项'].extend(['Cell数量', 'Cell列表'])
cell_list_display = ','.join(str(c) for c in all_cells[:10])
if len(all_cells) > 10:
cell_list_display += f'...等{len(all_cells)}'
stats_data[''].extend([cell_count, cell_list_display])
return stats_data
@staticmethod
def _create_time_details_sheet(writer, time_stats):
"""创建TestCycleTime详情工作表"""
time_details_data = []
all_times = time_stats.get('all_times', [])
time_distribution = time_stats.get('time_distribution', {})
records_by_time = time_stats.get('records_by_time', {})
for time_val in all_times:
record_count = records_by_time.get(time_val, time_distribution.get(time_val, 0))
time_details_data.append({
'测试周期时间': time_val,
'记录数量': record_count,
'占比(%)': round(record_count / sum(time_distribution.values()) * 100, 2) if time_distribution else 0
})
if time_details_data:
df_time_details = pd.DataFrame(time_details_data)
df_time_details.to_excel(writer, sheet_name='TestCycleTime Details', index=False)
# 设置TestCycleTime Details工作表的列宽
wb = writer.book
ws = writer.sheets.get('TestCycleTime Details')
if ws is not None:
ws.set_column(0, 0, 25) # 测试周期时间列
ws.set_column(1, 1, 15) # 记录数量列
ws.set_column(2, 2, 15) # 占比列
@staticmethod
def _prepare_dataframe(data_info):
"""准备DataFrame"""
if not data_info['data']:
return pd.DataFrame()
df = pd.DataFrame(data_info['data'], columns=data_info['headers'])
cols = data_info['headers'].copy()
test_name_idx = cols.index('Test Name') if 'Test Name' in cols else next(
(i for i, h in enumerate(cols) if re.search(r'test\s*name', h, flags=re.I)), 0)
cols.insert(test_name_idx + 1, cols.pop(test_name_idx + 1))
return df[cols]
class StatisticsCollector:
"""统计信息收集器"""
def __init__(self):
self.reset()
def reset(self):
"""重置统计信息"""
self.total_files_processed = 0
self.success_files = 0
self.failed_files = 0
self.generated_reports = 0
self.failed_reports = 0
self.total_sn_count = 0
# 详细统计
self.cell_statistics = defaultdict(lambda: {
'file_count': 0,
'sn_set': set(), # 改为set存储SN
'fail_count': 0,
'elevation_count': 0,
'fail_details': defaultdict(int) # 存储失败项详情test_name -> 失败次数
})
self.sn_statistics = defaultdict(lambda: {
'file_count': 0,
'cells': set(),
'fail_count': 0,
'elevation_count': 0,
'fail_details': defaultdict(int) # 新增SN级别的失败项详情
})
self.failure_details = {
'file_failures': [],
'report_failures': []
}
self.test_elevation_stats = {
'total_elevations': 0,
'elevation_by_cell': defaultdict(int),
'avg_elevation_per_file': 0
}
def add_file_processing_result(self, result):
"""添加文件处理结果统计"""
self.total_files_processed += 1
if result["success"]:
self.success_files += 1
sn = result.get("sn", "UNKNOWN_SN")
cell = result.get("cell", "UNKNOWN_CELL")
fail_count = result.get("fail_count", 0)
rows = result.get("rows", [])
# 更新SN统计
self.sn_statistics[sn]['file_count'] += 1
self.sn_statistics[sn]['cells'].add(cell)
self.sn_statistics[sn]['fail_count'] += fail_count
# 更新Cell统计
self.cell_statistics[cell]['file_count'] += 1
self.cell_statistics[cell]['sn_set'].add(sn)
self.cell_statistics[cell]['fail_count'] += fail_count
# 收集失败项详情同时收集SN和Cell级别的
self._collect_fail_details(sn, cell, rows, result.get("headers", []))
else:
self.failed_files += 1
self.failure_details['file_failures'].append(
f"{result.get('file', '未知文件')}: {result.get('error', '未知错误')}"
)
def _collect_fail_details(self, sn, cell, rows, headers):
"""收集失败项详情包括SN和Cell级别"""
try:
# 找到状态列和测试名称列的索引
status_idx = -1
test_name_idx = -1
test_name_new_idx = -1
for i, header in enumerate(headers):
header_lower = str(header).lower()
if 'status' in header_lower or 'result' in header_lower:
status_idx = i
elif 'test name new' in header_lower:
test_name_new_idx = i
elif 'test name' in header_lower:
test_name_idx = i
# 优先使用Test Name New其次使用Test Name
test_name_col_idx = test_name_new_idx if test_name_new_idx != -1 else test_name_idx
if status_idx == -1 or test_name_col_idx == -1:
return
for row in rows:
if len(row) > max(status_idx, test_name_col_idx):
status_val = str(row[status_idx]).strip().upper()
test_name = str(row[test_name_col_idx]).strip()
if 'FAIL' in status_val and test_name:
# 同时记录SN级别和Cell级别的失败项
self.sn_statistics[sn]['fail_details'][test_name] += 1
self.cell_statistics[cell]['fail_details'][test_name] += 1
except Exception as e:
# 如果收集失败项详情出错,忽略继续处理
pass
def add_report_generation_result(self, result):
"""添加报告生成结果统计"""
if result["success"]:
self.generated_reports += 1
else:
self.failed_reports += 1
sn = result.get("sn", "UNKNOWN_SN")
cell = result.get("cell", "UNKNOWN_CELL")
error = result.get("error", "未知错误")
self.failure_details['report_failures'].append(
f"SN {sn} (Cell {cell}): {error}"
)
def add_test_elevation_data(self, cell, elevation_count):
"""添加测试拔高数量统计"""
if elevation_count > 0:
self.test_elevation_stats['total_elevations'] += elevation_count
self.test_elevation_stats['elevation_by_cell'][cell] += elevation_count
def finalize_statistics(self):
"""完成统计计算"""
self.total_sn_count = len(self.sn_statistics)
# 计算平均测试拔高数量
if self.success_files > 0:
self.test_elevation_stats['avg_elevation_per_file'] = (
self.test_elevation_stats['total_elevations'] / self.success_files
)
return {
'total_sn_count': self.total_sn_count,
'total_files_processed': self.total_files_processed,
'success_files': self.success_files,
'failed_files': self.failed_files,
'generated_reports': self.generated_reports,
'failed_reports': self.failed_reports,
'cell_statistics': dict(self.cell_statistics),
'sn_statistics': {sn: {
'file_count': stats['file_count'],
'cells': list(stats['cells']),
'fail_count': stats['fail_count'],
'fail_details': dict(stats['fail_details']) # 包含失败项详情
} for sn, stats in self.sn_statistics.items()},
'failure_details': self.failure_details,
'test_elevation_stats': self.test_elevation_stats
}
class ParallelHTMLReportProcessor:
"""并行HTML报告处理器"""
def __init__(self, log_manager, statistics_collector):
self.sn_data_map = {}
self.sn_source_files = defaultdict(set)
self.sn_fail_counts = defaultdict(int)
self.sn_file_counts = defaultdict(int)
self.sn_cell_info = defaultdict(set) # 存储每个SN对应的cell编号
self.sn_test_cycle_times = defaultdict(set) # 存储每个SN对应的测试周期时间
self.log_manager = log_manager
self.statistics_collector = statistics_collector
def process_files(self, source_dir, max_workers=None):
"""并行处理目录中的所有文件"""
all_files = self._scan_files(source_dir)
if not all_files:
self.log_manager.log_warning("未找到HTML文件")
print(f"{Fore.YELLOW}⚠ 未找到HTML文件")
return self.sn_data_map
# 预扫描SN分布
self.log_manager.log_info(f"开始扫描文件分布,共{len(all_files)}个文件")
print(f"{Fore.YELLOW}⌛ 正在扫描文件分布...")
self._collect_sn_distribution(all_files)
# 显示文件分布
self._display_file_distribution()
# 设置工作进程数
if max_workers is None:
max_workers = min(mp.cpu_count(), len(all_files))
self.log_manager.log_info(f"开始并行处理 {len(all_files)} 个文件 (使用 {max_workers} 个进程)")
print(f"{Fore.CYAN}▶ 开始并行处理 {len(all_files)} 个文件 (使用 {max_workers} 个进程)")
# 创建进度跟踪器
progress_tracker = ThreadSafeProgressTracker(len(all_files), self.log_manager)
# 使用进程池并行处理
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_file = {executor.submit(HTMLFileProcessor.process_single_file, file_path): file_path
for file_path in all_files}
# 处理结果
infor = ''
for future in future_to_file:
result = future.result()
success = result["success"]
# 添加到统计收集器
self.statistics_collector.add_file_processing_result(result)
if success:
self._store_result_data(result)
# progress_tracker.update(success)
if not success:
error_msg = f"处理失败: {result['file']} - {result['error']}"
self.log_manager.log_error(error_msg)
infor = infor + f"{Fore.RED}{error_msg}|"
progress_tracker.update(success, infor)
progress_tracker.finish(phase_name="HTML文件处理")
self._add_report_statistics()
return self.sn_data_map
def _scan_files(self, source_dir):
"""扫描HTML文件"""
all_files = []
for root_dir, _, files in os.walk(source_dir):
all_files.extend(
[os.path.join(root_dir, f) for f in files if f.lower().endswith(('.html', '.htm'))]
)
return all_files
def _collect_sn_distribution(self, file_list):
"""预扫描SN分布"""
for file_path in file_list:
filename = os.path.basename(file_path)
sn, cell = self._extract_sn_and_cell_from_filename(filename)
test_cycle_time = HTMLFileProcessor._extract_test_cycle_time(filename)
self.sn_file_counts[sn] += 1
self.sn_cell_info[sn].add(cell)
self.sn_test_cycle_times[sn].add(test_cycle_time)
def _extract_sn_and_cell_from_filename(self, filename):
"""从文件名提取SN和cell编号"""
sn_regex = r'F[A-Z0-9]{15}(?:[A-Z0-9]{5,})?'
sn_match = re.search(rf'\b({sn_regex})\b', filename, flags=re.I)
cell_match = re.search(r'-(\d+)\.html$', filename)
sn = sn_match.group(1) if sn_match else "UNKNOWN_SN"
cell = cell_match.group(1) if cell_match else "UNKNOWN_CELL"
return sn, cell
def _display_file_distribution(self):
"""显示文件分布包含cell编号和测试周期时间信息"""
dist_info = []
for sn, count in list(self.sn_file_counts.items())[:10]:
# 获取该SN对应的cell信息
cells = list(self.sn_cell_info.get(sn, set()))
cell_display = ', '.join(sorted(cells)[:3]) if cells else "未知"
if len(cells) > 3:
cell_display += f"...等{len(cells)}"
# 获取该SN对应的测试周期时间信息
times = list(self.sn_test_cycle_times.get(sn, set()))
time_display = ', '.join(sorted(times)[:11]) if times else "未知"
if len(times) > 11:
time_display += f"...等{len(times)}"
dist_info.append(f"{sn[:31]:<32}: {count}个文件, Cells: {cell_display:>3}, 时间: {time_display}")
if len(self.sn_file_counts) > 10:
dist_info.append(f"... 还有 {len(self.sn_file_counts) - 10} 个SN")
self.log_manager.log_info(f"{Fore.MAGENTA}⚫SN文件分布: \n{chr(10).join(dist_info)}")
# print(f"{Fore.MAGENTA}⚫ SN文件分布:\n{Fore.CYAN}{chr(10).join(dist_info)}")
def _store_result_data(self, result):
"""存储处理结果"""
sn = result["sn"]
cell = result.get("cell", "UNKNOWN_CELL")
test_cycle_time = result.get("test_cycle_time", "UNKNOWN_TIME")
filename = result["filename"]
fail_count = result.get("fail_count", 0)
# 记录文件来源和cell信息
self.sn_source_files[sn].add(filename)
self.sn_cell_info[sn].add(cell)
self.sn_test_cycle_times[sn].add(test_cycle_time)
# 存储数据
if sn not in self.sn_data_map:
self.sn_data_map[sn] = {'headers': result["headers"], 'data': []}
self.sn_data_map[sn]['data'].extend(result["rows"])
# 累加FAIL数量
self.sn_fail_counts[sn] += fail_count
# 添加测试拔高统计这里使用fail_count作为拔高数量示例您可以根据实际需求调整
self.statistics_collector.add_test_elevation_data(cell, fail_count)
def _add_report_statistics(self):
"""添加报告统计信息包含cell和测试周期时间信息"""
for sn, data_info in self.sn_data_map.items():
source_count = len(self.sn_source_files.get(sn, []))
cell_set = self.sn_cell_info.get(sn, set())
time_set = self.sn_test_cycle_times.get(sn, set())
cell_list = list(cell_set)
time_list = list(time_set)
primary_cell = cell_list[0] if cell_list else "UNKNOWN_CELL"
primary_time = time_list[0] if time_list else "UNKNOWN_TIME"
data_info['report_stats'] = {
'source_files_count': source_count,
'fail_count': self.sn_fail_counts.get(sn, 0),
'cell_info': {
'primary_cell': primary_cell,
'all_cells': cell_list,
'cell_count': len(cell_set)
},
'time_info': {
'primary_time': primary_time,
'all_times': time_list,
'time_count': len(time_set)
}
}
class ParallelExcelReportGenerator:
"""并行Excel报告生成器"""
def __init__(self, output_dir, log_manager, statistics_collector):
self.output_dir = output_dir
self.log_manager = log_manager
self.statistics_collector = statistics_collector
def generate_reports(self, sn_data_map, max_workers=None):
"""并行生成Excel报告"""
total_reports = len(sn_data_map)
if total_reports == 0:
self.log_manager.log_warning("没有数据可生成报告")
print(f"{Fore.YELLOW}⚠ 没有数据可生成报告")
return [], []
# 设置工作进程数
if max_workers is None:
max_workers = min(mp.cpu_count(), total_reports)
self.log_manager.log_info(f"开始并行生成Excel报告 (共{total_reports}个,使用 {max_workers} 个进程)")
print(f"{Fore.CYAN}▶ 开始并行生成Excel报告 (共{total_reports}个,使用 {max_workers} 个进程)")
# 创建进度跟踪器
progress_tracker = ThreadSafeProgressTracker(total_reports, self.log_manager)
# 准备报告数据包含cell和测试周期时间信息
report_tasks = []
for sn, data_info in sn_data_map.items():
cell_info = data_info.get('report_stats', {}).get('cell_info', {})
time_info = data_info.get('report_stats', {}).get('time_info', {})
source_files_count = data_info.get('report_stats', {}).get('source_files_count', {})
primary_cell = cell_info.get('primary_cell', 'UNKNOWN_CELL')
all_cells = cell_info.get('all_cells', 'UNKNOWN_CELLlist')
primary_time = time_info.get('primary_time', 'UNKNOWN_TIME')
report_tasks.append({
"sn": sn,
"cell": primary_cell,
"all_cells": all_cells,
"test_cycle_time": primary_time,
"data_info": data_info,
"source_files_count": source_files_count
})
success_reports = []
failed_reports = []
# 使用进程池并行生成报告
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_report = {
executor.submit(ExcelReportWorker.generate_single_report, task, self.output_dir): task
for task in report_tasks
}
# 处理结果
for future in future_to_report:
result = future.result()
# 添加到统计收集器
self.statistics_collector.add_report_generation_result(result)
if result["success"]:
success_reports.append(result)
progress_tracker.update(success=True, count=1,_display_progress = False)
self._show_success_info(result)
else:
failed_reports.append(result)
progress_tracker.update(success=False, count=1)
# 安全显示错误信息包含cell和测试周期时间信息
self._show_error_info(result)
progress_tracker.finish(phase_name="Excel报告生成")
# 显示最终统计包含cell和测试周期时间信息
self._show_final_stats(success_reports, failed_reports)
return success_reports, failed_reports
def _show_success_info(self, result):
"""安全显示单个成功报告信息包含cell和测试周期时间信息"""
try:
# 安全处理SN显示
sn_display = str(result.get('sn', 'UNKNOWN_SN'))[:32]
if result.get('sn') and len(str(result['sn'])) > 32:
sn_display += "..."
# 安全处理cell显示
# cell_display = str(result.get('cell', 'UNKNOWN_CELL'))
cell_list_display = ','.join(str(c) for c in result.get('all_cells', 'UNKNOWN_CELL'))
cell_display = str( cell_list_display )
# 安全处理测试周期时间显示
time_display = str(result.get('test_cycle_time', 'UNKNOWN_TIME'))[:20]
# 安全处理文件名显示
output_file = result.get('output_file', '')
file_name = os.path.basename(output_file) if output_file else '未知文件'
success_info = [
f"{Fore.GREEN}✓ 生成成功",
f"SN: {sn_display:<32}",
f"Cell: {cell_display:<8}",
f"时间: {time_display}",
f"文件: {file_name:<60}",
f"记录数: {result.get('record_count', 0):>5}",
f"来源文件: {result.get('source_files_count', 0):>2}",
f"FAIL数量: {result.get('fail_count', 0):>3}"
]
info_msg = ' | '.join(success_info)
self.log_manager.log_info(info_msg)
# print('\x1b[2K\r' + info_msg.ljust(120))
except Exception as e:
# 如果显示信息时出错,使用简化显示
msg = f"✓ 报告生成成功 (SN: {result.get('sn', 'UNKNOWN_SN')}, Cell: {result.get('cell', 'UNKNOWN_CELL')}, Time: {result.get('test_cycle_time', 'UNKNOWN_TIME')})"
self.log_manager.log_info(msg)
print(f"{Fore.GREEN}{msg}")
def _show_error_info(self, result):
"""安全显示单个失败报告信息包含cell和测试周期时间信息"""
try:
# 安全处理SN显示
sn_display = str(result.get('sn', 'UNKNOWN_SN'))[:32]
if result.get('sn') and len(str(result['sn'])) > 32:
sn_display += "..."
# 安全处理cell显示
cell_display = str(result.get('cell', 'UNKNOWN_CELL'))
# 安全处理测试周期时间显示
time_display = str(result.get('test_cycle_time', 'UNKNOWN_TIME'))[:20]
# 安全处理错误信息
error_msg = str(result.get('error', '未知错误'))[:50]
if len(str(result.get('error', ''))) > 50:
error_msg += "..."
error_info = [
f"{Fore.RED}✗ 生成失败",
f"SN: {sn_display}",
f"Cell: {cell_display}",
f"时间: {time_display}",
f"错误: {error_msg}"
]
info_msg = ' | '.join(error_info)
self.log_manager.log_error(info_msg)
print('\x1b[2K\r' + info_msg.ljust(100))
except Exception as e:
# 如果显示信息时出错,使用简化显示
msg = f"✗ 报告生成失败 (SN: {result.get('sn', 'UNKNOWN_SN')}, Cell: {result.get('cell', 'UNKNOWN_CELL')}, Time: {result.get('test_cycle_time', 'UNKNOWN_TIME')})"
self.log_manager.log_error(msg)
print(f"{Fore.RED}{msg}")
def _show_final_stats(self, success_reports, failed_reports):
"""显示最终统计信息包含cell和测试周期时间信息"""
try:
total_records = sum(report.get('record_count', 0) for report in success_reports)
total_sources = sum(report.get('source_files_count', 0) for report in success_reports)
total_fails = sum(report.get('fail_count', 0) for report in success_reports)
# 统计涉及的不同cell数量
unique_cells = set(report.get('cell', 'UNKNOWN_CELL') for report in success_reports + failed_reports)
# 统计涉及的不同测试周期时间数量
unique_times = set(
report.get('test_cycle_time', 'UNKNOWN_TIME') for report in success_reports + failed_reports)
stats_msg = (
f"\n=== 最终统计 ===\n"
f"成功生成报告: {len(success_reports)}\n"
f"失败报告: {len(failed_reports)}\n"
f"总记录数: {total_records}\n"
f"总来源文件: {total_sources}\n"
f"总FAIL数量: {total_fails}\n"
f"涉及Cell数量: {len(unique_cells)}\n"
f"涉及测试周期时间数量: {len(unique_times)}\n"
f"输出目录: {self.output_dir}"
)
self.log_manager.log_info(stats_msg)
print(f"\n{Fore.CYAN}{stats_msg}")
if failed_reports:
failure_details = "\n失败报告详情:\n"
for report in failed_reports:
sn = report.get('sn', 'UNKNOWN_SN')
cell = report.get('cell', 'UNKNOWN_CELL')
time = report.get('test_cycle_time', 'UNKNOWN_TIME')
error = report.get('error', '未知错误')
failure_details += f" {sn} (Cell {cell}, Time {time}): {error}\n"
self.log_manager.log_warning(failure_details)
print(f"\n{Fore.YELLOW}{failure_details}")
except Exception as e:
error_msg = f"统计信息显示出错: {e}"
self.log_manager.log_error(error_msg)
print(f"{Fore.RED}{error_msg}")
class ReportProcessor:
"""主报告处理器"""
def __init__(self):
self.log_manager = None
self.statistics_collector = StatisticsCollector()
def process_reports(self, html_max_workers=None, excel_max_workers=None):
"""处理完整流程"""
source_dir = self._get_directory_from_console()
if not source_dir:
print(f"{Fore.RED}❌ 未选择目录,程序退出")
return
output_dir = self._create_output_dir(source_dir)
# 初始化日志管理器
self.log_manager = LogManager(output_dir)
self.log_manager.log_info(f"开始处理报告,源目录: {source_dir}, 输出目录: {output_dir}")
try:
# 阶段1并行处理HTML文件
self.log_manager.log_info("=== 阶段1: HTML文件处理 ===")
print(f"\n{Fore.CYAN}=== 阶段1: HTML文件处理 ===")
html_processor = ParallelHTMLReportProcessor(self.log_manager, self.statistics_collector)
processed_data = html_processor.process_files(source_dir, html_max_workers)
if not processed_data:
self.log_manager.log_warning("没有处理任何数据,程序结束")
print(f"{Fore.YELLOW}⚠ 没有处理任何数据,程序结束")
return
# 阶段2并行生成Excel报告
self.log_manager.log_info("=== 阶段2: Excel报告生成 ===")
print(f"\n{Fore.CYAN}=== 阶段2: Excel报告生成 ===")
excel_generator = ParallelExcelReportGenerator(output_dir, self.log_manager, self.statistics_collector)
success_reports, failed_reports = excel_generator.generate_reports(
processed_data, excel_max_workers)
# 记录最终统计信息到日志
statistics_data = self.statistics_collector.finalize_statistics()
self.log_manager.log_statistics(statistics_data)
# 安全显示总体结果
self._show_overall_result(len(processed_data), (success_reports), (failed_reports))
except Exception as e:
error_msg = f"程序执行出错: {type(e).__name__}: {str(e)}"
self.log_manager.log_error(error_msg)
print(f"\n{Fore.RED}{error_msg}")
import traceback
traceback.print_exc()
def _get_directory_from_console(self):
"""从控制台获取目录路径"""
while True:
print(f"\n{Fore.CYAN}=== 并行HTML报告处理程序 ===")
print(f"{Fore.WHITE}请输入包含HTML文件的目录路径:")
path = input("> ").strip()
if not path:
print(f"{Fore.YELLOW}⚠ 路径不能为空,请重新输入")
continue
path = path.strip('"\'')
if not os.path.exists(path):
print(f"{Fore.RED}❌ 路径不存在,请重新输入")
continue
if not os.path.isdir(path):
print(f"{Fore.RED}❌ 请输入目录路径,而不是文件路径")
continue
return path
def _create_output_dir(self, source_dir):
"""创建输出目录"""
output_dir = os.path.join(source_dir, f"Html文件分析_带Cell编号_{datetime.now().strftime('%Y%m%d%H%M%S')}")
os.makedirs(output_dir, exist_ok=True)
# 只有在log_manager初始化后才能记录日志
if self.log_manager:
self.log_manager.log_info(f"输出目录创建成功: {output_dir}")
print(f"{Fore.GREEN}✔ 输出目录创建成功: {output_dir}")
return output_dir
def _show_overall_result(self, total_sn, success_reports, failed_reports):
"""安全显示总体结果包含cell和测试周期时间信息"""
result_msg = (
f"\n=== 程序执行完成 ===\n"
f"✓ 处理完成!\n"
f"总SN数量: {total_sn}\n"
f"成功报告: {len(success_reports)}\n"
f"失败报告: {len(failed_reports)}"
)
if self.log_manager:
self.log_manager.log_info(result_msg)
print(f"\n{Fore.CYAN}{result_msg}")
if len(failed_reports) == 0:
completion_msg = "🎉 所有报告生成成功!"
self.log_manager.log_info(completion_msg)
print(f"{Fore.GREEN}{completion_msg}")
else:
warning_msg = f"⚠ 有 {len(failed_reports)} 个报告生成失败,请查看上述错误信息"
self.log_manager.log_warning(warning_msg)
print(f"{Fore.YELLOW}{warning_msg}")
# 在文件末尾替换现有的 if __name__ == "__main__": 代码块
def main():
"""主程序入口点"""
try:
processor = ReportProcessor()
# 可分别指定HTML处理和Excel生成的进程数
processor.process_reports(
html_max_workers=None, # HTML处理进程数
excel_max_workers=None # Excel生成进程数
)
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}⚠ 用户中断程序")
except Exception as e:
print(f"\n{Fore.RED}❌ 程序执行出错: {type(e).__name__}: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import multiprocessing as mp
# 关键:防止打包环境下的重复执行
mp.freeze_support()
# 额外保护:确保只在主进程中执行
if mp.current_process().name == 'MainProcess':
main()
input(f"输入任意结束程序......")
else:
# 子进程不需要执行任何交互代码
pass