import os import re import sys import time import pandas as pd import matplotlib.pyplot as plt from datetime import datetime from matplotlib.lines import Line2D from typing import Optional, Tuple, List, Dict, Any, Union from pathlib import Path import numpy as np from colorama import Fore, Style, init # 避免 SettingWithCopy 警告影响输出可读性 pd.options.mode.chained_assignment = None # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial Unicode MS', 'Microsoft YaHei'] plt.rcParams['axes.unicode_minus'] = False class TestReportScatterPlotter: def __init__(self): self.file_path: Optional[str] = None self.df: Optional[pd.DataFrame] = None self.output_dir: Optional[str] = None self.required_columns = ["Test Name New", "SN", "Measurement", "Test Time"] self.col_lower: Optional[str] = None self.col_upper: Optional[str] = None # 缓存处理过的数据 self._processed_data_cache: Dict[str, Any] = {} def _print_stage(self, msg: str) -> None: """统一的阶段信息输出""" print(f"\n{'=' * 30}\n{msg}\n{'=' * 30}") def _print_progress(self, current: int, total: int, prefix: str = "进度") -> None: """改进的进度条显示""" if total <= 0: return percent = (current / total) * 100 bar_len = 30 filled = int(bar_len * current / total) bar = "█" * filled + "-" * (bar_len - filled) sys.stdout.write(f"\r{prefix}: [{bar}] {current}/{total} ({percent:.1f}%)") sys.stdout.flush() if current == total: print() # 换行 def get_file_path(self) -> None: """改进的文件路径获取,支持路径补全""" self._print_stage("输入文件路径") while True: print(f"{Fore.WHITE}请输入测试报告文件路径(.xlsx): ") file_path = input("> ").strip() # 尝试路径补全和验证 if not file_path: continue path_obj = Path(file_path) if path_obj.exists(): self.file_path = str(path_obj.resolve()) print(f"已选择文件: {self.file_path}") break else: print(f"文件不存在: {file_path},请重新输入") def _find_column_case_insensitive(self, candidates: List[str]) -> Optional[str]: """优化的大小写不敏感列查找""" if self.df is None: return None columns_lower = {col.lower().strip(): col for col in self.df.columns} for candidate in candidates: key = candidate.lower().strip() if key in columns_lower: return columns_lower[key] return None def load_data(self) -> None: """优化的数据加载方法""" self._print_stage("加载数据") start_time = time.time() # try: # # 使用更高效的数据读取方式 # self.df = pd.read_excel( # self.file_path, # sheet_name="Merged All Tests", # engine='openpyxl' # 指定引擎提高性能 # ) # except Exception as e: # raise RuntimeError( # f"读取 Excel 失败,请确认工作表名为 'Merged All Tests'。错误: {type(e).__name__}: {e}" # ) # 检查文件是否存在 if not os.path.exists(self.file_path): raise FileNotFoundError(f"文件不存在: {self.file_path}") # 检查文件扩展名是否为Excel支持的格式 if not self.file_path.lower().endswith(('.xls', '.xlsx')): raise ValueError("输入文件不是有效的 Excel 文件(应为 .xls 或 .xlsx 格式)") try: # 打开Excel文件并获取所有sheet名称 excel_file = pd.ExcelFile(self.file_path, engine='openpyxl') sheet_names = excel_file.sheet_names except Exception as e: raise RuntimeError(f"无法打开 Excel 文件,请确认该文件未被损坏或占用。错误: {type(e).__name__}: {e}") # 定义优先查找的工作表名 target_sheets = ["Merged All Tests", "All Tests"] selected_sheet = None for sheet in target_sheets: if sheet in sheet_names: selected_sheet = sheet break if selected_sheet is None: raise ValueError( f"未找到指定的工作表: {' 或 '.join(target_sheets)}。" f"当前文件包含的工作表有: {sheet_names}" ) try: # 使用更高效的方式读取指定sheet self.df = pd.read_excel( self.file_path, sheet_name=selected_sheet, engine='openpyxl' ) except Exception as e: raise RuntimeError( f"读取 Excel 失败,工作表: '{selected_sheet}'。错误: {type(e).__name__}: {e}" ) if self.df.empty: raise ValueError("工作表为空,无法处理") # 校验必要列 missing_columns = [col for col in self.required_columns if col not in self.df.columns] if missing_columns: raise KeyError(f"缺少必要列: {missing_columns}") # 记录上下限列名 self.col_lower = self._find_column_case_insensitive([ "Lower Limit", "lower limit", "lower_limit", "ll", "lower" ]) self.col_upper = self._find_column_case_insensitive([ "Upper Limit", "upper limit", "upper_limit", "ul", "upper" ]) loading_time = time.time() - start_time print(f"数据加载完成: {len(self.df)} 行 × {self.df.shape[1]} 列") print(f"耗时: {loading_time:.2f}s") # 显示列信息摘要 print(f"检测到下限列: {self.col_lower or '无'}") print(f"检测到上限列: {self.col_upper or '无'}") def get_keyword(self) -> Tuple[pd.DataFrame, str, List[str]]: """获取用户输入的关键词并筛选数据""" self._print_stage("筛选关键词") while True: keyword = input("请输入筛选关键词(匹配 'Test Name New'): ").strip() if not keyword: print("关键词不能为空,请重新输入") continue break mask = self.df["Test Name New"].astype(str).str.contains(keyword, case=False, na=False) filtered_df = self.df.loc[mask].copy() if filtered_df.empty: raise ValueError(f"没有找到包含关键词 '{keyword}' 的测试项") unique_tests = filtered_df["Test Name New"].unique().tolist() print(f"匹配到 {len(filtered_df)} 行数据,涉及 {len(unique_tests)} 个不同测试项") return filtered_df, keyword, unique_tests def create_output_dir(self) -> None: """创建输出目录""" self._print_stage("创建输出目录") if not self.file_path: raise ValueError("文件路径未设置") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_dir = os.path.dirname(self.file_path) self.output_dir = os.path.join(base_dir, f"scatter_plots_{timestamp}") os.makedirs(self.output_dir, exist_ok=True) print(f"输出目录: {self.output_dir}") @staticmethod def _safe_filename(name: str) -> str: """生成安全的文件名""" safe = "".join(c for c in str(name) if c.isalnum() or c in (" ", "_", "-")).strip() return safe or "Unknown_Test" def _extract_limits(self, df_one_test: pd.DataFrame) -> Tuple[ Optional[float], Optional[float], List[float], List[float]]: """提取某个测试项的上下限数值""" lower_plot = upper_plot = None lower_set = [] upper_set = [] if self.col_lower and self.col_lower in df_one_test.columns: lower_vals = self._clean_and_convert_series(df_one_test[self.col_lower], 'numeric').dropna().unique() lower_set = sorted(lower_vals.tolist()) if len(lower_vals) > 0 else [] if lower_set: lower_plot = min(lower_set) if self.col_upper and self.col_upper in df_one_test.columns: upper_vals = self._clean_and_convert_series(df_one_test[self.col_upper], 'numeric').dropna().unique() upper_set = sorted(upper_vals.tolist()) if len(upper_vals) > 0 else [] if upper_set: upper_plot = max(upper_set) return lower_plot, upper_plot, lower_set, upper_set @staticmethod def _clean_and_convert_series(series: pd.Series, target_type: str = 'numeric') -> pd.Series: """统一的系列清洗和转换方法""" if series.empty: return series if target_type == 'numeric': # 数值转换优化 if pd.api.types.is_numeric_dtype(series): return series.astype(float) # 批量字符串处理 cleaned = series.astype(str).str.replace(r'[, ]', '', regex=True).str.strip() return pd.to_numeric(cleaned, errors='coerce') elif target_type == 'datetime': return TestReportScatterPlotter._convert_to_datetime(series) return series @staticmethod def _convert_to_datetime(series: pd.Series) -> pd.Series: """优化的日期时间转换""" if pd.api.types.is_datetime64_any_dtype(series): return series # 预处理:转换为数值和字符串两种形式 numeric_series = pd.to_numeric(series, errors='coerce') string_series = series.astype(str).str.strip() result = pd.Series(pd.NaT, index=series.index, dtype='datetime64[ns]') # 数值时间戳处理 masks = { 'ms': numeric_series >= 1e11, 's': (numeric_series >= 1e9) & (numeric_series < 1e11), 'excel': (numeric_series > 20000) & (numeric_series < 60000) } for mask_type, mask in masks.items(): if mask.any(): if mask_type == 'ms': result.loc[mask] = pd.to_datetime(numeric_series.loc[mask], unit='ms') elif mask_type == 's': result.loc[mask] = pd.to_datetime(numeric_series.loc[mask], unit='s') elif mask_type == 'excel': origin = pd.Timestamp('1899-12-30') result.loc[mask] = origin + pd.to_timedelta(numeric_series.loc[mask], unit='D') # 字符串日期处理 remaining_mask = result.isna() if remaining_mask.any(): remaining_strings = string_series.loc[remaining_mask] # 特定格式优先处理 format_patterns = [ (r'^\d{4}-\d{2}-\d{2} \d{2}-\d{2}-\d{2}$', '%Y-%m-%d %H-%M-%S'), ] for pattern, date_format in format_patterns: format_mask = remaining_strings.str.match(pattern) if format_mask.any(): result.loc[remaining_mask[remaining_mask].index[format_mask]] = pd.to_datetime( remaining_strings.loc[format_mask], format=date_format, errors='coerce' ) # 通用解析 still_na_mask = result.isna() & remaining_mask if still_na_mask.any(): result.loc[still_na_mask] = pd.to_datetime( string_series.loc[still_na_mask], errors='coerce' ) return result def _preprocess_test_data(self, test_data: pd.DataFrame) -> pd.DataFrame: """数据预处理""" # 数值转换 test_data['Measurement_num'] = self._clean_and_convert_series( test_data['Measurement'], 'numeric' ) test_data['TestTime_dt'] = self._clean_and_convert_series( test_data['Test Time'], 'datetime' ) # 去除无效数据 valid_data = test_data.dropna(subset=['Measurement_num', 'TestTime_dt']) return valid_data.sort_values('TestTime_dt') def _calculate_statistics(self, y_data: pd.Series) -> Dict[str, float]: """计算统计信息""" stats = { 'count': len(y_data), 'mean': y_data.mean(), 'median': y_data.median(), 'min': y_data.min(), 'max': y_data.max(), 'std': y_data.std(), 'q1': y_data.quantile(0.25), 'q3': y_data.quantile(0.75) } return stats def _add_statistics_textbox(self, ax, stats: Dict[str, float], x_pos: float = 1.02, y_pos: float = 0.98) -> None: """在图表右侧添加统计信息文本框""" # 使用英文标签避免中文显示问题 stats_text = ( f"Count: {stats['count']}\n" f"Mean: {stats['mean']:.4f}\n" f"Median: {stats['median']:.4f}\n" f"Min: {stats['min']:.4f}\n" f"Max: {stats['max']:.4f}\n" f"Std: {stats['std']:.4f}\n" f"Q1: {stats['q1']:.4f}\n" f"Q3: {stats['q3']:.4f}" ) # 添加文本框到右侧,使用英文字体 props = dict(boxstyle='round', facecolor='wheat', alpha=0.8) ax.text(x_pos, y_pos, stats_text, transform=ax.transAxes, fontsize=8, verticalalignment='top', horizontalalignment='left', # 左对齐 bbox=props, fontfamily='monospace') def _add_statistics_lines(self, ax, stats: Dict[str, float], x_min: float, x_max: float) -> None: """添加统计线到图表""" # 添加平均值线 ax.hlines(y=stats['mean'], xmin=x_min, xmax=x_max, colors='orange', linestyles='-', linewidth=1.5, alpha=0.7, label='Mean') # 添加中位数线 ax.hlines(y=stats['median'], xmin=x_min, xmax=x_max, colors='purple', linestyles='-.', linewidth=1.5, alpha=0.7, label='Median') # 添加Q1和Q3线 ax.hlines(y=stats['q1'], xmin=x_min, xmax=x_max, colors='gray', linestyles=':', linewidth=1.0, alpha=0.7, label='Q1') ax.hlines(y=stats['q3'], xmin=x_min, xmax=x_max, colors='gray', linestyles=':', linewidth=1.0, alpha=0.7, label='Q3') def _configure_plot(self, ax, test_data: pd.DataFrame, test_name: str, lower_plot: Optional[float], upper_plot: Optional[float]) -> None: """配置图形属性""" # 计算统计信息 y_data = test_data['Measurement_num'] stats = self._calculate_statistics(y_data) # 获取时间范围用于统计线 x_min = test_data['TestTime_dt'].min() x_max = test_data['TestTime_dt'].max() # Y轴范围计算 y_min, y_max = y_data.min(), y_data.max() y_candidates = [y_min, y_max] # 绘制限值线 custom_lines = [] if lower_plot is not None: y_candidates.append(lower_plot) ax.axhline(y=lower_plot, color='green', linestyle='--', linewidth=1.2) custom_lines.append(Line2D([0], [0], color='green', linestyle='--', label="Lower Limit")) if upper_plot is not None: y_candidates.append(upper_plot) ax.axhline(y=upper_plot, color='red', linestyle='--', linewidth=1.2) custom_lines.append(Line2D([0], [0], color='red', linestyle='--', label="Upper Limit")) # 添加统计线 self._add_statistics_lines(ax, stats, x_min, x_max) # 设置范围 valid_candidates = [y for y in y_candidates if pd.notna(y)] if valid_candidates: y_min_plot = min(valid_candidates) y_max_plot = max(valid_candidates) y_range = y_max_plot - y_min_plot if y_range == 0: y_range = abs(y_max_plot) * 0.1 if y_max_plot != 0 else 1.0 y_min_plot = y_min_plot - y_range / 2 y_max_plot = y_max_plot + y_range / 2 ax.set_ylim(y_min_plot - 0.1 * y_range, y_max_plot + 0.1 * y_range) # 添加统计信息文本框到右侧 self._add_statistics_textbox(ax, stats) # 设置标题和标签,使用英文避免中文问题 ax.set_title(f"Scatter Plot - {test_name}\n" f"Mean: {stats['mean']:.4f}, Median: {stats['median']:.4f}, " f"Range: [{stats['min']:.4f}, {stats['max']:.4f}]", fontsize=10) ax.set_xlabel("Test Time") ax.set_ylabel("Measurement Value") ax.grid(True, alpha=0.3) ax.tick_params(axis='x', rotation=45) # 图例处理 - 优化位置在右侧 handles, labels = ax.get_legend_handles_labels() if custom_lines: handles.extend(custom_lines) labels.extend([line.get_label() for line in custom_lines]) if handles: # 根据图例项数量决定图例位置和布局 if len(handles) > 10: # 如果图例项很多,使用两列布局 ncol = 2 # 调整图例位置,确保不遮挡数据 ax.legend(handles=handles, labels=labels, title="Legend", fontsize=8, loc='center left', bbox_to_anchor=(1.05, 0.5), ncol=ncol, frameon=True, fancybox=True, shadow=True) else: # 图例项较少时使用单列布局 ax.legend(handles=handles, labels=labels, title="Legend", fontsize=8, loc='center left', bbox_to_anchor=(1.05, 0.5), frameon=True, fancybox=True, shadow=True) def _save_plot(self, fig, test_name: str) -> None: """保存图形""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_name = self._safe_filename(test_name) filename = f"{safe_name}_{timestamp}.png" output_path = os.path.join(self.output_dir, filename) # 调整布局以确保图例完整显示 fig.savefig(output_path, dpi=300, bbox_inches='tight') plt.close(fig) print(f"已保存: {output_path}") def plot_scatter(self, filtered_df: pd.DataFrame, unique_tests: List[str]) -> None: """优化的散点图绘制方法""" self._print_stage("生成散点图") total_tests = len(unique_tests) start_time = time.time() for i, test_name in enumerate(unique_tests, 1): self._print_progress(i, total_tests, "测试项绘图") # 使用缓存避免重复计算 cache_key = f"test_{hash(test_name)}" if cache_key in self._processed_data_cache: test_data = self._processed_data_cache[cache_key] else: test_data = filtered_df[filtered_df["Test Name New"] == test_name].copy() # 预处理数据 test_data = self._preprocess_test_data(test_data) self._processed_data_cache[cache_key] = test_data if test_data.empty: print(f"\n跳过 '{test_name}' - 无有效的 Measurement/Test Time 数据") continue # 提取限值信息 lower_plot, upper_plot, lower_set, upper_set = self._extract_limits(test_data) # 输出限值信息 limit_info = [] if lower_set: limit_info.append(f"Lower unique={len(lower_set)}, used={lower_plot}") else: limit_info.append("Lower N/A") if upper_set: limit_info.append(f"Upper unique={len(upper_set)}, used={upper_plot}") else: limit_info.append("Upper N/A") # 计算并输出统计信息 y_data = test_data['Measurement_num'] stats = self._calculate_statistics(y_data) stat_info = ( f"数据点: {stats['count']}, " f"均值: {stats['mean']:.4f}, " f"中位数: {stats['median']:.4f}, " f"范围: [{stats['min']:.4f}, {stats['max']:.4f}]" ) print(f"\n→ 绘制: '{test_name}' | {stat_info} | 限值: {', '.join(limit_info)}") # 创建图形 - 增大图像尺寸以容纳图例和统计信息 sn_count = len(test_data["SN"].unique()) if "SN" in test_data.columns else 1 # 根据SN数量和预期图例项数量调整图形大小 base_width = 14 # 增加宽度以容纳统计信息 base_height = 9 # 增加高度以容纳更多信息 # 如果SN数量多,增加图形宽度以容纳图例 if sn_count > 5: fig_width = base_width + min(sn_count / 5, 6) # 最大增加6个单位宽度 else: fig_width = base_width fig, ax = plt.subplots(figsize=(fig_width, base_height)) # 分组绘制 groups = list(test_data.groupby("SN")) if "SN" in test_data.columns else [("Unknown_SN", test_data)] for j, (sn, group) in enumerate(groups, 1): ax.scatter(group['TestTime_dt'], group['Measurement_num'], label=str(sn), alpha=0.7, s=25) if j % 10 == 0 or j == len(groups): self._print_progress(j, len(groups), "SN分组绘制") # 配置图形 self._configure_plot(ax, test_data, test_name, lower_plot, upper_plot) # 调整布局,为右侧统计信息和图例留出空间 plt.tight_layout() plt.subplots_adjust(right=0.8 if sn_count <= 10 else 0.7) # 为右侧统计信息留出更多空间 # 保存图像 self._save_plot(fig, test_name) total_time = time.time() - start_time print(f"\n全部绘图完成,总耗时: {total_time:.2f}s") print(f"所有图表已保存到: {self.output_dir}") def run(self) -> None: """运行主程序""" try: self.get_file_path() self.load_data() filtered_df, keyword, unique_tests = self.get_keyword() self.create_output_dir() self.plot_scatter(filtered_df, unique_tests) except Exception as e: print(f"\n❌ 发生错误: {type(e).__name__}: {str(e)}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": plotter = TestReportScatterPlotter() plotter.run()