import pandas as pd from bs4 import BeautifulSoup import os import re import sys from datetime import datetime from colorama import Fore, Style, init from concurrent.futures import ThreadPoolExecutor, as_completed init(autoreset=True) class ProgressTracker: """终端进度显示控制器(增强版)""" def __init__(self): self.processed = 0 self.total = 0 self.start_time = datetime.now() def begin(self, total_files, sn_file_counts=None): """初始化进度跟踪,显示文件分布统计(sn_file_counts可选)""" self.total = total_files self.start_time = datetime.now() # 构建文件分布统计信息 dist_info = [] if sn_file_counts: for sn, count in sn_file_counts.items(): dist_info.append(f"{sn[:31]}: {count}个 html文件。") print(f"{dist_info}") # 格式化输出 stats_line = f"{Fore.CYAN}▶ 开始处理 {self.total} 个任务" if dist_info: dist_line = f"{Fore.MAGENTA}⚫ SN文件分布:\n{'\n'.join(dist_info)}" print(f"\n{stats_line.ljust(80)}") print(f"{dist_line.ljust(580)}{Style.RESET_ALL}") else: print(f"\n{stats_line.ljust(80)}") def update(self, success=True, prefix=''): """更新进度信息""" self.processed += 1 time_used = self._format_timedelta(datetime.now() - self.start_time) percent = self.processed / self.total * 100 status_icon = f"{Fore.GREEN}✓" if success else f"{Fore.RED}✗" status_text = f"{status_icon} {self.processed}/{self.total} [{(percent / 5):.0f}|{'▉' * int(percent / 5)}{' ' * (20 - int(percent / 5))}|]" sys_info = [ f"{prefix}{status_text.ljust(40)}", f"进度: {percent:.1f}%".ljust(15), f"耗时: {time_used}".ljust(15), f"速度: {self.processed / (datetime.now() - self.start_time).total_seconds():.1f} 任务/秒" ] print('\x1b[2K\r' + ' ‖ '.join(sys_info), end='', flush=True) def end(self, prefix=''): """结束进度跟踪""" print(f"\n{Fore.GREEN}✔ {prefix}处理完成! 总耗时: {(datetime.now() - self.start_time).total_seconds():.1f}秒\n") def _format_timedelta(self, delta): """格式化时间差""" seconds = delta.total_seconds() return f"{int(seconds // 3600):02}:{int((seconds % 3600) // 60):02}:{int(seconds % 60):02}" class HTMLReportProcessor: """HTML报告处理核心类(增强版:并行+线程安全)""" def __init__(self): # 所有共享数据仅在主线程合并,避免并发写入 self.sn_data_map = {} self.progress = ProgressTracker() # 跟踪每个SN的文件来源 self.sn_source_files = {} # 跟踪每个SN的fail数量 self.sn_fail_counts = {} # 存储SN文件分布 self.sn_file_counts = {} @staticmethod def _clean_test_name(raw_name): """清洗测试名称""" return re.sub(r'^Round\d+_\d+_', '', raw_name) def _extract_sn(self, soup, filename): """双重机制提取SN号(增强版正则)""" try: sn_regex = r'F[A-Z0-9]{15}(?:[A-Z0-9]{5,})?' # 支持短格式和长格式 if soup is not None: # 机制1:从HTML内容提取 sn_tag = soup.find('h3', string=re.compile(r'Serial Number:', re.I)) if sn_tag: # 优化点2:增加边界检测防止误匹配 content_match = re.search(rf'\b({sn_regex})\b', sn_tag.get_text(), flags=re.I) if content_match: return content_match.group(1) else: return "UNKNOWN_SN" else: # 如果未找到sn_tag,尝试从文档其他位置检索 html_text = soup.get_text(" ", strip=True) content_match = re.search(rf'\b({sn_regex})\b', html_text, flags=re.I) return content_match.group(1) if content_match else "UNKNOWN_SN" else: # 机制2:从文件名提取 content_match = re.search(rf'\b({sn_regex})\b', filename, flags=re.I) return content_match.group(1) if content_match else "UNKNOWN_SN" except Exception as e: print(f"SN提取失败: {filename} - {str(e)}") return "ERROR_SN" def process_files(self, source_dir): """处理目录中的所有文件(并行版)""" all_files = self._scan_files(source_dir) # 预扫描文件,收集SN分布 self._collect_sn_distribution(all_files) # 传递SN分布信息给进度跟踪器 self.progress.begin(len(all_files), self.sn_file_counts) # 并行处理文件:主线程负责合并数据和打印进度 max_workers = self._calc_max_workers(env_var="OVERRIDE_WORKERS") print(f"{Fore.CYAN}▶ 使用线程并发数(HTML解析): {max_workers}") futures = [] results = [] errors = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: for f in all_files: futures.append(executor.submit(self._process_single_file, f)) for future in as_completed(futures): try: res = future.result() except Exception as e: # 捕获未处理的异常 res = {'success': False, 'error': f"未知异常: {type(e).__name__}: {e}"} # 更新进度(主线程) self.progress.update(res.get('success', False), prefix='HTML解析: ') # 成功/失败归集 if res.get('success'): results.append(res) else: errors.append(res.get('error')) self.progress.end(prefix='HTML解析') # 输出错误明细,便于定位问题 if errors: print(f"\n{Fore.RED}✗ 以下文件处理失败(共 {len(errors)} 个):") for err in errors[:50]: print(f" - {err}") if len(errors) > 50: print(f" ... 其余 {len(errors) - 50} 条省略") # 合并结果到共享数据结构(主线程) for res in results: sn = res['sn'] headers = res['headers'] rows = res['rows'] filename = res['filename'] fail_count = res['file_fail_count'] self._store_data(sn, headers, rows) if sn not in self.sn_source_files: self.sn_source_files[sn] = set() self.sn_source_files[sn].add(filename) self.sn_fail_counts[sn] = self.sn_fail_counts.get(sn, 0) + fail_count # 添加报告统计信息 self._add_report_statistics() return self.sn_data_map def _calc_max_workers(self, env_var="OVERRIDE_WORKERS"): """根据机器性能自动计算线程数,可通过环境变量覆盖""" override = os.getenv(env_var) if override and override.isdigit(): return max(1, int(override)) # I/O + 中等CPU场景,适度放大 cpu = os.cpu_count() or 2 return max(4, min(32, cpu * 2)) def _collect_sn_distribution(self, file_list): """预扫描文件,收集SN分布信息""" print(f"{Fore.YELLOW}⌛ 正在扫描文件分布...") for file_path in file_list: filename = os.path.basename(file_path) try: # 直接从文件名提取SN(不解析文件内容) sn = self._extract_sn_from_filename(filename) # 更新SN文件计数 self.sn_file_counts[sn] = self.sn_file_counts.get(sn, 0) + 1 except Exception as e: print(f"\n{Fore.RED}⚠ 处理失败: {filename} - :{str(e)}") pass print(f"{Fore.GREEN}✔ SN分布扫描完成!") def _extract_sn_from_filename(self, filename): """仅从文件名提取SN号(优化版)""" sn_regex = r'F[A-Z0-9]{15}(?:[A-Z0-9]{5,})?' content_match = re.search(rf'\b({sn_regex})\b', filename, flags=re.I) return content_match.group(1) if content_match else "UNKNOWN_SN" def _add_report_statistics(self): """为每个SN添加报告统计信息""" for sn, data_info in self.sn_data_map.items(): # 添加文件来源数量 source_count = len(self.sn_source_files.get(sn, [])) data_info['report_stats'] = { 'source_files_count': source_count, # 注意:此处暂存解析阶段的失败数量,Excel生成时会以 df_fail 的长度为准覆盖 'fail_count': self.sn_fail_counts.get(sn, 0) } def _scan_files(self, source_dir): """扫描目标目录中的HTML文件""" all_files = [] for root_dir, _, files in os.walk(source_dir): all_files.extend( [os.path.join(root_dir, f) for f in files if f.lower().endswith(('.html', '.htm'))] ) return all_files def _process_single_file(self, file_path): """处理单个文件(线程安全:不修改共享状态,返回结果)""" try: with open(file_path, 'r', encoding='utf-8') as f: html_content = f.read() soup = BeautifulSoup(html_content, 'html.parser') filename = os.path.basename(file_path) sn = self._extract_sn(soup, filename) table = soup.find('table', border=1) or soup.find('table') if not table: raise ValueError("未找到有效数据表格") # 修改:将 sn 传入,返回 file_fail_count,确保与数据存储同一 SN headers, rows, file_fail_count = self._process_table(table, sn, filename, html_content) return { 'success': True, 'sn': sn, 'headers': headers, 'rows': rows, 'file_fail_count': file_fail_count, 'filename': filename } except Exception as e: return { 'success': False, 'error': f"{os.path.basename(file_path)} - {type(e).__name__}: {str(e)}" } def _find_status_index(self, headers): """根据表头动态识别状态列索引""" if not headers: return None candidates = ('status', 'result', 'test status') for idx, h in enumerate(headers): h_norm = str(h).strip().lower() if h_norm in candidates or re.search(r'status|result', h_norm, flags=re.I): return idx return None def _process_table(self, table, sn, filename, html_content): """处理数据表格(修复FAIL统计与状态列识别;参数传入避免并发问题)""" # 更健壮的表头行识别 header_tr = table.find('tr', bgcolor='#eeeeee') if not header_tr: # 尝试第一个包含 th 的行 for tr in table.find_all('tr'): if tr.find('th'): header_tr = tr break if not header_tr: # 兜底使用第一行 header_tr = table.find('tr') headers = [th.get_text(strip=True) for th in header_tr.find_all(['th', 'td'])] if len(headers) > 11: headers = headers[:11] # 插入新增列 try: test_name_idx = headers.index('Test Name') except ValueError: # 如果表头没有 Test Name,尝试模糊匹配 test_name_idx = next((i for i, h in enumerate(headers) if re.search(r'test\s*name', h, flags=re.I)), 1) headers.insert(test_name_idx + 1, 'Test Name New') headers.append('Test Time') headers.append('Source File name') # 动态识别状态列索引 status_col_idx = self._find_status_index(headers) # 初始化基准时间戳和全局叠加器 base_timestamp = None global_elapsed_accumulator = 0.0 elapsed_append = 0 file_fail_count = 0 # 跟踪当前文件的FAIL数量 # 从报告头部提取 Start Time 作为初始基准 start_time_match = re.search(r"Start Time:\s*(.+?)(?:\s*<|$)", html_content, re.IGNORECASE) if start_time_match: start_time_str = start_time_match.group(1).strip() # 清理字符串,移除HTML标签 start_time_str = re.sub(r'<[^>]+>', '', start_time_str).strip() try: # 解析 Start Time 字符串为 datetime 对象 dt = datetime.strptime(start_time_str, "%A, %B %d, %Y %I:%M:%S %p") base_timestamp = dt.timestamp() global_elapsed_accumulator = base_timestamp print(f"{Fore.GREEN}✔ 使用Start Time作为时间基准: {start_time_str} -> {base_timestamp}") except Exception as e: print(f"{Fore.RED}⚠ 解析Start Time失败: {start_time_str} - {e}") # 尝试其他可能的日期格式 try: # 尝试去掉星期几 dt = datetime.strptime(start_time_str.split(', ', 1)[1], "%B %d, %Y %I:%M:%S %p") base_timestamp = dt.timestamp() global_elapsed_accumulator = base_timestamp print(f"{Fore.GREEN}✔ 使用简化格式Start Time作为时间基准: {start_time_str} -> {base_timestamp}") except Exception as e2: print(f"{Fore.RED}⚠ 二次解析Start Time失败: {start_time_str} - {e2}") rows = [] # 跳过表头的两行(与原逻辑保持一致),但更安全地跳过 header_tr 所在的行 all_trs = table.find_all('tr') start_index = 2 if len(all_trs) >= 3 else 1 for row in all_trs[start_index:]: cols = [td.get_text(strip=True) for td in row.find_all(['td', 'th'])] if len(cols) < 2: continue original_test_name = cols[1].strip() if not original_test_name: print(f"\rFile{Fore.RED}:{self.currentFilename} 存在空的 Test name!") # 统计 FAIL 数量 if status_col_idx is not None and len(cols) > status_col_idx: status_val = cols[status_col_idx].strip().upper() if 'FAIL' in status_val: file_fail_count += 1 elapsed_time_str = cols[9].strip() if len(cols) > 9 else "0" # 处理 Test_Time 行,更新基准时间 if original_test_name == "Test_Time": measurement_str = cols[7] if len(cols) > 7 else "" try: dt = datetime.strptime(measurement_str, "%m/%d/%Y %I:%M:%S %p") timestamp = dt.timestamp() base_timestamp = timestamp - float(elapsed_time_str) global_elapsed_accumulator = base_timestamp print(f"{Fore.GREEN}✔ 更新时间基准为Test_Time: {measurement_str} -> {base_timestamp}") except Exception as e: print(f"{Fore.RED}⚠ 解析Test_Time失败: {measurement_str} - {e}") # 如果Test_Time解析失败,保持使用Start Time作为基准 # 计算时间戳 try: elapsed_append = global_elapsed_accumulator + float(elapsed_time_str) except ValueError: elapsed_append = global_elapsed_accumulator # 插入清洗后的 test name、新增时间戳、来源文件名 cols.insert(test_name_idx + 1, self._clean_test_name(cols[test_name_idx])) cols.append(elapsed_append) cols.append(filename) rows.append(cols) # 返回给调用方,由调用方统一按相同 SN 键累加 return headers, rows, file_fail_count def _store_data(self, sn, headers, rows): """存储解析后的数据(主线程调用)""" if sn not in self.sn_data_map: self.sn_data_map[sn] = {'headers': headers, 'data': []} self.sn_data_map[sn]['data'].extend(rows) class ExcelReportGenerator: """Excel报告生成器(并行版)""" def __init__(self, output_dir, max_workers=None): self.output_dir = output_dir self.progress = ProgressTracker() self.max_workers = max_workers or self._calc_max_workers(env_var="EXCEL_WORKERS") def generate_reports(self, sn_data_map): """并行生成所有Excel报告""" total_reports = len(sn_data_map.items()) errors = [] successes = [] print(f"\n{Fore.CYAN}▶ 开始并行生成Excel报告(共{total_reports}个),线程并发数: {self.max_workers}") self.progress.begin(total_reports) futures = [] with ThreadPoolExecutor(max_workers=self.max_workers) as executor: for sn, data_info in sn_data_map.items(): futures.append(executor.submit(self._generate_one_report, sn, data_info)) for future in as_completed(futures): try: res = future.result() except Exception as e: res = {'success': False, 'error': f"未知异常: {type(e).__name__}: {e}"} # 主线程更新进度与输出 self.progress.update(res.get('success', False), prefix='Excel生成: ') if res.get('success'): successes.append(res) else: errors.append(res.get('error')) self.progress.end(prefix='Excel生成') # 汇总输出结果 for s in successes[:50]: print(f"{Fore.GREEN}✓ 生成成功 | 文件: {os.path.basename(s['output_file'])} | SN: {s['sn']} | 记录数: {s['records']} | 来源HTML: {s['source_files_count']} | FAIL总数: {s['fail_count']}") if len(successes) > 50: print(f"{Fore.GREEN}... 成功列表省略 {len(successes)-50} 条") if errors: print(f"\n{Fore.RED}✗ 以下报告生成失败(共 {len(errors)} 个):") for err in errors[:50]: print(f" - {err}") if len(errors) > 50: print(f" ... 其余 {len(errors) - 50} 条省略") print(f"\n{Fore.CYAN}输出目录: {self.output_dir}") def _calc_max_workers(self, env_var="EXCEL_WORKERS"): """根据机器性能自动计算线程数,可通过环境变量覆盖(EXCEL_WORKERS)""" override = os.getenv(env_var) if override and override.isdigit(): return max(1, int(override)) cpu = os.cpu_count() or 2 # 写Excel主要是I/O,适度并发,但避免过高导致磁盘抖动 return max(2, min(16, cpu * 2)) def _generate_one_report(self, sn, data_info): """工作线程:生成单个SN的Excel报告(线程安全,不打印)""" try: base_name = f"{sn}_Report" output_file = os.path.join(self.output_dir, f"{base_name}.xlsx") df_all = self._prepare_dataframe(data_info) # 动态识别状态列并统计失败数据(更稳健的列识别与包含FAIL) status_col = self._detect_status_column(df_all) if status_col: fail_mask = df_all[status_col].astype(str).str.strip().str.upper().str.contains('FAIL') df_fail = df_all[fail_mask] fail_count = int(fail_mask.sum()) else: df_fail = pd.DataFrame(columns=df_all.columns) fail_count = 0 # 如果有FAIL项,重命名文件 if fail_count > 0: new_name = f"{base_name}_Fail-item-{fail_count}.xlsx" output_file = os.path.join(self.output_dir, new_name) # 报告统计数据 report_stats = data_info.get('report_stats', {}) source_files_count = report_stats.get('source_files_count', 0) # 写Excel文件 self._save_excel(df_all, df_fail, output_file, sn, source_files_count, fail_count) return { 'success': True, 'sn': sn, 'output_file': output_file, 'records': len(df_all), 'source_files_count': source_files_count, 'fail_count': fail_count, } except Exception as e: return { 'success': False, 'error': f"SN: {sn} - {type(e).__name__}: {str(e)}" } def _detect_status_column(self, df): """自动检测状态列名称(增强:支持模糊匹配与大小写不敏感)""" for col in df.columns: col_str = str(col) if re.search(r'\b(status|result)\b', col_str, flags=re.I) or col_str.strip().lower() in ( 'status', 'result', 'test status'): return col return None def _save_excel(self, df_all, df_fail, output_file, sn, source_files_count, fail_count): """保存Excel文件,包含All Tests和FAIL list两个工作表及统计信息""" try: with pd.ExcelWriter(output_file, engine='openpyxl') as writer: # 统计信息工作表(直接使用 df_fail 的数量) stats_data = { '统计项': ['SN号', '来源HTML文件数', '总FAIL数量', '生成时间'], '值': [sn, source_files_count, fail_count, datetime.now().strftime("%Y-%m-%d %H:%M:%S")] } df_stats = pd.DataFrame(stats_data) df_stats.to_excel(writer, sheet_name='Report Stats', index=False) # 原有工作表 df_all.to_excel(writer, sheet_name='All Tests', index=False) df_fail.to_excel(writer, sheet_name='FAIL list', index=False) # 设置列宽 workbook = writer.book if 'Report Stats' in workbook.sheetnames: worksheet = workbook['Report Stats'] worksheet.column_dimensions['A'].width = 20 worksheet.column_dimensions['B'].width = 30 except Exception as e: raise RuntimeError(f"Excel文件保存失败: {str(e)}") def _prepare_dataframe(self, data_info): """准备DataFrame(保持解析时的列顺序)""" df = pd.DataFrame(data_info['data'], columns=data_info['headers']) return df class ReportProcessor: """主报告处理器(控制台版本)""" def __init__(self): pass def process_reports(self): """处理完整流程""" source_dir = self._get_directory_from_console() if not source_dir: print(f"{Fore.RED}❌ 未选择目录,程序退出") return output_dir = self._create_output_dir(source_dir) processed_data = self._process_html_files(source_dir) self._generate_excel_reports(output_dir, processed_data) def _get_directory_from_console(self): """从控制台获取目录路径""" while True: print(f"\n{Fore.CYAN}=== HTML报告处理程序 ===") print(f"{Fore.WHITE}请输入包含HTML文件的目录路径:") path = input("> ").strip() if not path: print(f"{Fore.YELLOW}⚠ 路径不能为空,请重新输入") continue # 处理路径中的引号 path = path.strip('"\'') if not os.path.exists(path): print(f"{Fore.RED}❌ 路径不存在,请重新输入") continue if not os.path.isdir(path): print(f"{Fore.RED}❌ 请输入目录路径,而不是文件路径") continue return path def _create_output_dir(self, source_dir): """创建输出目录""" output_dir = os.path.join(source_dir, f"Html文件分析_{datetime.now().strftime('%Y%m%d%H%M%S')}") os.makedirs(output_dir, exist_ok=True) print(f"{Fore.GREEN}✔ 输出目录创建成功: {output_dir}") return output_dir def _process_html_files(self, source_dir): """处理HTML文件(并行)""" processor = HTMLReportProcessor() return processor.process_files(source_dir) def _generate_excel_reports(self, output_dir, data): """并行生成Excel报告""" generator = ExcelReportGenerator(output_dir) generator.generate_reports(data) if __name__ == "__main__": try: processor = ReportProcessor() processor.process_reports() # 程序结束时暂停,方便用户查看结果 print(f"\n{Fore.CYAN}=== 程序执行完成 ===") # input("按回车键退出...") except KeyboardInterrupt: print(f"\n{Fore.YELLOW}⚠ 用户中断程序") except Exception as e: print(f"\n{Fore.RED}❌ 程序执行出错: {type(e).__name__}: {str(e)}") import traceback traceback.print_exc() input("traceback 按回车键退出...")