1054 lines
39 KiB
Python
1054 lines
39 KiB
Python
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
from datetime import datetime
|
||
from matplotlib.lines import Line2D
|
||
from typing import Optional, Tuple, List, Dict, Any, Union
|
||
from pathlib import Path
|
||
import numpy as np
|
||
import base64
|
||
from io import BytesIO
|
||
from jinja2 import Template
|
||
from colorama import Fore, Style, init
|
||
import multiprocessing as mp
|
||
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
|
||
import psutil
|
||
|
||
# 初始化colorama
|
||
init(autoreset=True)
|
||
|
||
# 避免 SettingWithCopy 警告影响输出可读性
|
||
pd.options.mode.chained_assignment = None
|
||
|
||
# 设置中文字体支持
|
||
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial Unicode MS', 'Microsoft YaHei']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
# HTML模板 - 添加了SN独立图的显示
|
||
# 性能优化配置
|
||
OPTIMIZATION_CONFIG = {
|
||
'max_workers': min(mp.cpu_count(), 8), # 限制最大工作线程数
|
||
'chunk_size': 50000, # 分块读取大小
|
||
'use_threading': True, # 使用多线程
|
||
'memory_limit_gb': psutil.virtual_memory().available // (1024 ** 3) * 0.7, # 内存限制
|
||
}
|
||
|
||
HTML_TEMPLATE = """
|
||
<!DOCTYPE html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<title>测试报告分析 - {{ keyword }}</title>
|
||
<style>
|
||
/* 样式保持不变 */
|
||
body {
|
||
font-family: Arial, sans-serif;
|
||
margin: 0;
|
||
padding: 20px;
|
||
background-color: #f5f5f5;
|
||
}
|
||
.header {
|
||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||
color: white;
|
||
padding: 20px;
|
||
border-radius: 10px;
|
||
margin-bottom: 20px;
|
||
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
|
||
}
|
||
.test-card {
|
||
background: white;
|
||
border-radius: 10px;
|
||
padding: 20px;
|
||
margin-bottom: 20px;
|
||
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
|
||
transition: transform 0.2s ease;
|
||
}
|
||
.test-card:hover {
|
||
transform: translateY(-2px);
|
||
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.15);
|
||
}
|
||
.test-header {
|
||
display: flex;
|
||
justify-content: space-between;
|
||
align-items: center;
|
||
margin-bottom: 15px;
|
||
padding-bottom: 10px;
|
||
border-bottom: 2px solid #eaeaea;
|
||
}
|
||
.test-title {
|
||
font-size: 18px;
|
||
font-weight: bold;
|
||
color: #333;
|
||
}
|
||
.test-stats {
|
||
display: grid;
|
||
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
|
||
gap: 15px;
|
||
margin-bottom: 15px;
|
||
}
|
||
.stat-item {
|
||
background: #f8f9fa;
|
||
padding: 12px;
|
||
border-radius: 8px;
|
||
text-align: center;
|
||
}
|
||
.stat-label {
|
||
font-size: 12px;
|
||
color: #666;
|
||
margin-bottom: 5px;
|
||
}
|
||
.stat-value {
|
||
font-size: 16px;
|
||
font-weight: bold;
|
||
color: #333;
|
||
}
|
||
.plot-container {
|
||
text-align: center;
|
||
margin: 20px 0;
|
||
}
|
||
.plot-image {
|
||
max-width: 100%;
|
||
height: auto;
|
||
border-radius: 8px;
|
||
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
|
||
}
|
||
.sn-plots-container {
|
||
display: grid;
|
||
grid-template-columns: repeat(auto-fit, minmax(400px, 1fr));
|
||
gap: 20px;
|
||
margin: 20px 0;
|
||
}
|
||
.sn-plot-item {
|
||
background: #f8f9fa;
|
||
padding: 15px;
|
||
border-radius: 8px;
|
||
text-align: center;
|
||
}
|
||
.sn-plot-title {
|
||
font-size: 14px;
|
||
font-weight: bold;
|
||
margin-bottom: 10px;
|
||
color: #555;
|
||
}
|
||
.summary {
|
||
background: white;
|
||
border-radius: 10px;
|
||
padding: 20px;
|
||
margin-top: 20px;
|
||
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
|
||
}
|
||
.summary-item {
|
||
margin: 10px 0;
|
||
padding: 10px;
|
||
background: #f8f9fa;
|
||
border-radius: 6px;
|
||
}
|
||
.timestamp {
|
||
text-align: center;
|
||
color: #666;
|
||
margin-top: 30px;
|
||
font-size: 12px;
|
||
}
|
||
.badge {
|
||
padding: 4px 8px;
|
||
border-radius: 12px;
|
||
font-size: 12px;
|
||
font-weight: bold;
|
||
}
|
||
.badge-success {
|
||
background: #d4edda;
|
||
color: #155724;
|
||
}
|
||
.badge-warning {
|
||
background: #fff3cd;
|
||
color: #856404;
|
||
}
|
||
.badge-danger {
|
||
background: #f8d7da;
|
||
color: #721c24;
|
||
}
|
||
.section-title {
|
||
font-size: 16px;
|
||
font-weight: bold;
|
||
margin: 20px 0 10px 0;
|
||
color: #333;
|
||
border-left: 4px solid #667eea;
|
||
padding-left: 10px;
|
||
}
|
||
.file-info {
|
||
background: #e7f3ff;
|
||
padding: 10px;
|
||
border-radius: 6px;
|
||
margin: 10px 0;
|
||
}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="header">
|
||
<h1>📊 测试报告分析 (多文件合并)</h1>
|
||
<p>关键词: <strong>{{ keyword }}</strong> | 生成时间: {{ timestamp }}</p>
|
||
<p>共分析 {{ test_count }} 个测试项,{{ total_points }} 个数据点,来自 {{ file_count }} 个文件</p>
|
||
</div>
|
||
|
||
<div class="summary">
|
||
<h3>📁 处理的文件列表</h3>
|
||
{% for file_info in file_infos %}
|
||
<div class="file-info">
|
||
<strong>{{ loop.index }}. {{ file_info.filename }}</strong><br>
|
||
路径: {{ file_info.path }}<br>
|
||
数据行数: {{ file_info.rows }} | 测试项数: {{ file_info.tests }}
|
||
</div>
|
||
{% endfor %}
|
||
</div>
|
||
|
||
{% for test in tests %}
|
||
<div class="test-card">
|
||
<div class="test-header">
|
||
<div class="test-title">📋 {{ test.name }}</div>
|
||
<div class="badge badge-{{ test.status }}">
|
||
{{ test.status_display }}
|
||
</div>
|
||
</div>
|
||
|
||
<div class="test-stats">
|
||
<div class="stat-item">
|
||
<div class="stat-label">数据点数</div>
|
||
<div class="stat-value">{{ test.stats.count }}</div>
|
||
</div>
|
||
<div class="stat-item">
|
||
<div class="stat-label">平均值</div>
|
||
<div class="stat-value">{{ "%.4f"|format(test.stats.mean) }}</div>
|
||
</div>
|
||
<div class="stat-item">
|
||
<div class="stat-label">中位数</div>
|
||
<div class="stat-value">{{ "%.4f"|format(test.stats.median) }}</div>
|
||
</div>
|
||
<div class="stat-item">
|
||
<div class="stat-label">标准差</div>
|
||
<div class="stat-value">{{ "%.4f"|format(test.stats.std) }}</div>
|
||
</div>
|
||
<div class="stat-item">
|
||
<div class="stat-label">最小值</div>
|
||
<div class="stat-value">{{ "%.4f"|format(test.stats.min) }}</div>
|
||
</div>
|
||
<div class="stat-item">
|
||
<div class="stat-label">最大值</div>
|
||
<div class="stat-value">{{ "%.4f"|format(test.stats.max) }}</div>
|
||
</div>
|
||
</div>
|
||
|
||
{% if test.limits.lower is not none or test.limits.upper is not none %}
|
||
<div class="test-stats">
|
||
{% if test.limits.lower is not none %}
|
||
<div class="stat-item">
|
||
<div class="stat-label">下限值</div>
|
||
<div class="stat-value">{{ "%.4f"|format(test.limits.lower) }}</div>
|
||
</div>
|
||
{% endif %}
|
||
{% if test.limits.upper is not none %}
|
||
<div class="stat-item">
|
||
<div class="stat-label">上限值</div>
|
||
<div class="stat-value">{{ "%.4f"|format(test.limits.upper) }}</div>
|
||
</div>
|
||
{% endif %}
|
||
</div>
|
||
{% endif %}
|
||
|
||
<!-- 汇总图 -->
|
||
<div class="section-title">📈 汇总视图 (所有SN)</div>
|
||
<div class="plot-container">
|
||
<img src="data:image/png;base64,{{ test.summary_plot_image }}" alt="{{ test.name }} 汇总散点图" class="plot-image">
|
||
</div>
|
||
|
||
<!-- SN独立图 -->
|
||
{% if test.sn_plot_images %}
|
||
<div class="section-title">🔍 SN独立视图 ({{ test.sn_plot_images|length }}个SN)</div>
|
||
<div class="sn-plots-container">
|
||
{% for sn_plot in test.sn_plot_images %}
|
||
<div class="sn-plot-item">
|
||
<div class="sn-plot-title">SN: {{ sn_plot.sn }}</div>
|
||
<img src="data:image/png;base64,{{ sn_plot.image }}" alt="{{ test.name }} - SN {{ sn_plot.sn }} 散点图" class="plot-image">
|
||
</div>
|
||
{% endfor %}
|
||
</div>
|
||
{% endif %}
|
||
</div>
|
||
{% endfor %}
|
||
|
||
<div class="summary">
|
||
<h3>📈 分析摘要</h3>
|
||
<div class="summary-item">
|
||
<strong>文件夹路径:</strong> {{ folder_path }}
|
||
</div>
|
||
<div class="summary-item">
|
||
<strong>分析时间:</strong> {{ analysis_time }}秒
|
||
</div>
|
||
<div class="summary-item">
|
||
<strong>测试项分布:</strong>
|
||
<ul>
|
||
<li>正常: {{ status_counts.normal }} 个</li>
|
||
<li>警告: {{ status_counts.warning }} 个</li>
|
||
<li>异常: {{ status_counts.abnormal }} 个</li>
|
||
</ul>
|
||
</div>
|
||
<div class="summary-item">
|
||
<strong>数据摘要:</strong>
|
||
<ul>
|
||
<li>总文件数: {{ file_count }} 个</li>
|
||
<li>总数据行数: {{ total_rows }} 行</li>
|
||
<li>总测试项数: {{ test_count }} 个</li>
|
||
<li>总数据点数: {{ total_points }} 个</li>
|
||
</ul>
|
||
</div>
|
||
</div>
|
||
|
||
<div class="timestamp">
|
||
报告生成于 {{ timestamp }} | 多文件测试报告分析系统
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
|
||
class MultiFileTestReportScatterPlotter:
|
||
def __init__(self):
|
||
self.folder_path: Optional[str] = None
|
||
self.df: Optional[pd.DataFrame] = None
|
||
self.output_dir: Optional[str] = None
|
||
self.required_columns = ["Test Name New", "SN", "Measurement", "Test Time", "Lower Limit", "Upper Limit"]
|
||
self.col_lower: Optional[str] = None
|
||
self.col_upper: Optional[str] = None
|
||
self.html_report_path: Optional[str] = None
|
||
self.file_infos: List[Dict[str, Any]] = []
|
||
|
||
# 缓存处理过的数据
|
||
self._processed_data_cache: Dict[str, Any] = {}
|
||
|
||
# 性能监控
|
||
self.performance_stats = {
|
||
'load_times': [],
|
||
'memory_usage': [],
|
||
'file_sizes': []
|
||
}
|
||
|
||
def _print_stage(self, msg: str, color=Fore.CYAN) -> None:
|
||
"""统一的阶段信息输出"""
|
||
print(f"\n{color}{'=' * 50}")
|
||
print(f"📋 {msg}")
|
||
print(f"{'=' * 50}{Style.RESET_ALL}")
|
||
|
||
def _print_progress(self, current: int, total: int, prefix: str = "进度",
|
||
color=Fore.YELLOW) -> None:
|
||
"""改进的进度条显示"""
|
||
if total <= 0:
|
||
return
|
||
|
||
percent = (current / total) * 100
|
||
bar_len = 40
|
||
filled = int(bar_len * current / total)
|
||
bar = "█" * filled + "░" * (bar_len - filled)
|
||
|
||
sys.stdout.write(f"\r{color}{prefix}: [{bar}] {current}/{total} ({percent:.1f}%){Style.RESET_ALL}")
|
||
sys.stdout.flush()
|
||
|
||
if current == total:
|
||
print(f"{Fore.GREEN} ✅ 完成{Style.RESET_ALL}")
|
||
|
||
def _print_warning(self, msg: str) -> None:
|
||
"""警告信息输出"""
|
||
print(f"{Fore.YELLOW}⚠️ {msg}{Style.RESET_ALL}")
|
||
|
||
def _print_success(self, msg: str) -> None:
|
||
"""成功信息输出"""
|
||
print(f"{Fore.GREEN}✅ {msg}{Style.RESET_ALL}")
|
||
|
||
def _print_error(self, msg: str) -> None:
|
||
"""错误信息输出"""
|
||
print(f"{Fore.RED}❌ {msg}{Style.RESET_ALL}")
|
||
|
||
def _get_memory_usage(self) -> float:
|
||
"""获取当前内存使用量(GB)"""
|
||
process = psutil.Process()
|
||
return process.memory_info().rss / (1024 ** 3)
|
||
|
||
def _check_memory_safe(self, file_size_mb: float) -> bool:
|
||
"""检查内存是否安全"""
|
||
available_memory = psutil.virtual_memory().available / (1024 ** 3)
|
||
estimated_need = file_size_mb * 5 / 1024 # 估算需要的内存(GB)
|
||
return available_memory > estimated_need + 1 # 保留1GB安全空间
|
||
|
||
def _load_single_file_optimized(self, file_info: Dict[str, Any]) -> Optional[pd.DataFrame]:
|
||
"""优化单文件加载方法"""
|
||
file_path = file_info['path']
|
||
filename = file_info['filename']
|
||
|
||
try:
|
||
start_time = time.time()
|
||
file_size_mb = os.path.getsize(file_path) / (1024 ** 2)
|
||
|
||
# 内存安全检查
|
||
if not self._check_memory_safe(file_size_mb):
|
||
self._print_warning(f"内存不足,跳过大文件: {filename} ({file_size_mb:.1f}MB)")
|
||
return None
|
||
|
||
# 选择合适的引擎
|
||
file_ext = file_path.lower()
|
||
if file_ext.endswith('.xlsx'):
|
||
engine = 'openpyxl'
|
||
elif file_ext.endswith('.xls'):
|
||
engine = 'xlrd'
|
||
else:
|
||
self._print_warning(f"不支持的文件格式: {filename}")
|
||
return None
|
||
|
||
# 快速获取工作表信息
|
||
try:
|
||
excel_file = pd.ExcelFile(file_path, engine=engine)
|
||
sheet_names = excel_file.sheet_names
|
||
|
||
# 选择工作表
|
||
target_sheets = ["Merged All Tests", "All Tests", sheet_names[0] if sheet_names else None]
|
||
selected_sheet = next((s for s in target_sheets if s and s in sheet_names), None)
|
||
|
||
if not selected_sheet:
|
||
self._print_warning(f"未找到目标工作表: {filename}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
self._print_warning(f"无法读取工作表信息 {filename}: {e}")
|
||
return None
|
||
|
||
# 优化读取参数
|
||
read_kwargs = {
|
||
'io': file_path,
|
||
'sheet_name': selected_sheet,
|
||
'engine': engine,
|
||
'dtype': 'object',
|
||
'na_filter': False,
|
||
'usecols': self.required_columns, # 只读取需要的列
|
||
}
|
||
|
||
# 对于大文件,使用分块读取
|
||
if file_size_mb > 50: # 50MB以上使用分块读取
|
||
chunks = []
|
||
for chunk in pd.read_excel(**read_kwargs, chunksize=OPTIMIZATION_CONFIG['chunk_size']):
|
||
chunks.append(chunk)
|
||
|
||
if chunks:
|
||
df = pd.concat(chunks, ignore_index=True)
|
||
else:
|
||
df = pd.DataFrame()
|
||
else:
|
||
df = pd.read_excel(**read_kwargs)
|
||
|
||
if df.empty:
|
||
self._print_warning(f"文件为空: {filename}")
|
||
return None
|
||
|
||
# 检查必要列
|
||
missing_columns = [col for col in self.required_columns if col not in df.columns]
|
||
if missing_columns:
|
||
self._print_warning(f"缺少必要列 {filename}: {missing_columns}")
|
||
return None
|
||
|
||
# 添加文件标识
|
||
df['_source_file'] = filename
|
||
|
||
load_time = time.time() - start_time
|
||
file_info.update({
|
||
'load_time': round(load_time, 2),
|
||
'file_size_mb': round(file_size_mb, 2),
|
||
'engine': engine,
|
||
'rows': len(df)
|
||
})
|
||
|
||
self.performance_stats['load_times'].append(load_time)
|
||
self.performance_stats['file_sizes'].append(file_size_mb)
|
||
self.performance_stats['memory_usage'].append(self._get_memory_usage())
|
||
|
||
self._print_success(f"加载完成: {filename} ({len(df)}行, {load_time:.2f}s)")
|
||
return df
|
||
|
||
except Exception as e:
|
||
self._print_error(f"加载文件失败 {filename}: {e}")
|
||
return None
|
||
|
||
|
||
|
||
def _find_column_case_insensitive(self, candidates: List[str]) -> Optional[str]:
|
||
"""优化的大小写不敏感列查找"""
|
||
if self.df is None:
|
||
return None
|
||
|
||
columns_lower = {col.lower().strip(): col for col in self.df.columns}
|
||
for candidate in candidates:
|
||
key = candidate.lower().strip()
|
||
if key in columns_lower:
|
||
return columns_lower[key]
|
||
return None
|
||
|
||
def get_folder_path(self) -> None:
|
||
"""获取文件夹路径"""
|
||
self._print_stage("输入文件夹路径")
|
||
|
||
while True:
|
||
print(f"{Fore.WHITE}请输入包含Excel文件的文件夹路径: ")
|
||
folder_path = input("> ").strip()
|
||
|
||
if not folder_path:
|
||
continue
|
||
|
||
path_obj = Path(folder_path)
|
||
if path_obj.exists() and path_obj.is_dir():
|
||
self.folder_path = str(path_obj.resolve())
|
||
print(f"{Fore.GREEN}已选择文件夹: {self.folder_path}{Style.RESET_ALL}")
|
||
break
|
||
else:
|
||
self._print_error(f"文件夹不存在: {folder_path},请重新输入")
|
||
|
||
def find_excel_files(self) -> List[str]:
|
||
"""查找文件夹中的所有Excel文件"""
|
||
self._print_stage("扫描Excel文件")
|
||
|
||
excel_files = []
|
||
valid_extensions = ('.xlsx', '.xls')
|
||
|
||
try:
|
||
for file_path in Path(self.folder_path).rglob('*'):
|
||
if file_path.suffix.lower() in valid_extensions and file_path.is_file():
|
||
excel_files.append(str(file_path.resolve()))
|
||
|
||
# 按文件名排序
|
||
excel_files.sort()
|
||
|
||
self._print_success(f"找到 {len(excel_files)} 个Excel文件")
|
||
for i, file_path in enumerate(excel_files, 1):
|
||
print(f" {i:2d}. {os.path.basename(file_path)}")
|
||
|
||
return excel_files
|
||
|
||
except Exception as e:
|
||
self._print_error(f"扫描文件夹时发生错误: {e}")
|
||
return []
|
||
|
||
def load_multiple_files_optimized(self, excel_files: List[str]) -> None:
|
||
"""优化多文件加载方法"""
|
||
self._print_stage("并行加载Excel文件")
|
||
start_time = time.time()
|
||
|
||
# 准备文件信息
|
||
file_infos = [{'path': path, 'filename': os.path.basename(path)} for path in excel_files]
|
||
|
||
all_dataframes = []
|
||
self.file_infos = []
|
||
|
||
if OPTIMIZATION_CONFIG['use_threading'] and len(excel_files) > 1:
|
||
# 使用多线程并行加载
|
||
with ThreadPoolExecutor(max_workers=OPTIMIZATION_CONFIG['max_workers']) as executor:
|
||
futures = {executor.submit(self._load_single_file_optimized, file_info): file_info
|
||
for file_info in file_infos}
|
||
|
||
completed = 0
|
||
for future in futures:
|
||
try:
|
||
df = future.result(timeout=300) # 5分钟超时
|
||
if df is not None:
|
||
all_dataframes.append(df)
|
||
self.file_infos.append(futures[future])
|
||
completed += 1
|
||
self._print_progress(completed, len(excel_files), "并行加载文件")
|
||
except Exception as e:
|
||
file_info = futures[future]
|
||
self._print_error(f"加载失败 {file_info['filename']}: {e}")
|
||
else:
|
||
# 顺序加载
|
||
for i, file_info in enumerate(file_infos, 1):
|
||
self._print_progress(i, len(excel_files), "加载文件")
|
||
df = self._load_single_file_optimized(file_info)
|
||
if df is not None:
|
||
all_dataframes.append(df)
|
||
self.file_infos.append(file_info)
|
||
|
||
if not all_dataframes:
|
||
raise ValueError("没有成功加载任何Excel文件")
|
||
|
||
# 合并数据
|
||
self._print_stage("合并数据")
|
||
merge_start = time.time()
|
||
|
||
try:
|
||
self.df = pd.concat(all_dataframes, ignore_index=True, sort=False)
|
||
merge_time = time.time() - merge_start
|
||
|
||
total_time = time.time() - start_time
|
||
avg_load_time = np.mean(self.performance_stats['load_times']) if self.performance_stats['load_times'] else 0
|
||
|
||
self._print_success(f"合并完成: {len(self.df)}行, {len(all_dataframes)}个文件")
|
||
self._print_success(f"加载耗时: {total_time:.2f}s (平均: {avg_load_time:.2f}s/文件)")
|
||
self._print_success(f"合并耗时: {merge_time:.2f}s")
|
||
|
||
# 显示性能统计
|
||
print(f"\n{Fore.CYAN}📊 性能统计:")
|
||
print(f" 平均加载时间: {avg_load_time:.2f}s")
|
||
print(f" 峰值内存使用: {max(self.performance_stats['memory_usage']):.2f}GB")
|
||
print(f" 总文件大小: {sum(self.performance_stats['file_sizes']):.1f}MB{Style.RESET_ALL}")
|
||
|
||
except Exception as e:
|
||
self._print_error(f"合并数据失败: {e}")
|
||
raise
|
||
|
||
# 记录上下限列名
|
||
self.col_lower = self._find_column_case_insensitive([
|
||
"Lower Limit", "lower limit", "lower_limit", "ll", "lower"
|
||
])
|
||
self.col_upper = self._find_column_case_insensitive([
|
||
"Upper Limit", "upper limit", "upper_limit", "ul", "upper"
|
||
])
|
||
|
||
def get_keyword(self) -> Tuple[pd.DataFrame, str, List[str]]:
|
||
"""获取用户输入的关键词并筛选数据"""
|
||
self._print_stage("筛选关键词")
|
||
|
||
while True:
|
||
keyword = input("请输入筛选关键词(匹配 'Test Name New'): ").strip()
|
||
|
||
if not keyword:
|
||
print("❌ 关键词不能为空,请重新输入")
|
||
continue
|
||
|
||
# 检查数据框是否为空
|
||
if self.df.empty:
|
||
print("⚠️ 数据框为空,无法进行筛选")
|
||
return pd.DataFrame(), keyword, []
|
||
|
||
# 检查列是否存在
|
||
if "Test Name New" not in self.df.columns:
|
||
print("❌ 列 'Test Name New' 不存在于数据框中")
|
||
print(f"可用列: {list(self.df.columns)}")
|
||
return pd.DataFrame(), keyword, []
|
||
|
||
try:
|
||
mask = self.df["Test Name New"].astype(str).str.contains(keyword, case=False, na=False)
|
||
filtered_df = self.df.loc[mask].copy()
|
||
|
||
if filtered_df.empty:
|
||
# 提供友好的提示和建议
|
||
print(f"⚠️ 没有找到包含关键词 '{keyword}' 的测试项")
|
||
|
||
# 显示部分可用的测试项作为参考
|
||
available_tests = self.df["Test Name New"].dropna().unique()
|
||
if len(available_tests) > 0:
|
||
print("📋 可用的测试项示例:")
|
||
for test in available_tests[:5]:
|
||
print(f" - {test}")
|
||
if len(available_tests) > 5:
|
||
print(f" ... 还有 {len(available_tests) - 5} 个测试项")
|
||
|
||
# 提供重新输入或退出的选项
|
||
choice = input("请选择: 1-重新输入关键词 2-使用所有数据 3-退出当前操作: ")
|
||
if choice == "1":
|
||
continue
|
||
elif choice == "2":
|
||
filtered_df = self.df.copy()
|
||
unique_tests = filtered_df["Test Name New"].unique().tolist()
|
||
print(f"✅ 使用所有数据: {len(filtered_df)} 行,{len(unique_tests)} 个测试项")
|
||
return filtered_df, "", unique_tests
|
||
else:
|
||
print("👋 退出筛选操作")
|
||
return pd.DataFrame(), keyword, []
|
||
else:
|
||
unique_tests = filtered_df["Test Name New"].unique().tolist()
|
||
print(f"✅ 匹配到 {len(filtered_df)} 行数据,涉及 {len(unique_tests)} 个不同测试项")
|
||
return filtered_df, keyword, unique_tests
|
||
|
||
except Exception as e:
|
||
print(f"❌ 筛选过程中发生错误: {e}")
|
||
print("请检查数据格式或重新输入关键词")
|
||
continue
|
||
|
||
def create_output_dir(self, keyword) -> None:
|
||
"""创建输出目录"""
|
||
self._print_stage("创建输出目录")
|
||
|
||
if not self.folder_path:
|
||
raise ValueError("文件夹路径未设置")
|
||
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
self.output_dir = os.path.join(self.folder_path, f"scatter_report_out")
|
||
safe_keyword = self._safe_filename(keyword) if keyword else "all_data"
|
||
self.html_report_path = os.path.join(self.output_dir, f"{safe_keyword}_report_{timestamp}.html")
|
||
|
||
os.makedirs(self.output_dir, exist_ok=True)
|
||
print(f"输出目录: {self.output_dir}")
|
||
|
||
@staticmethod
|
||
def _safe_filename(name: str) -> str:
|
||
"""生成安全的文件名"""
|
||
safe = "".join(c for c in str(name) if c.isalnum() or c in (" ", "_", "-")).strip()
|
||
return safe or "Unknown_Test"
|
||
|
||
def _extract_limits(self, df_one_test: pd.DataFrame) -> Tuple[
|
||
Optional[float], Optional[float], List[float], List[float]]:
|
||
"""提取某个测试项的上下限数值"""
|
||
lower_plot = upper_plot = None
|
||
lower_set = []
|
||
upper_set = []
|
||
|
||
if self.col_lower and self.col_lower in df_one_test.columns:
|
||
lower_vals = self._clean_and_convert_series(df_one_test[self.col_lower], 'numeric').dropna().unique()
|
||
lower_set = sorted(lower_vals.tolist()) if len(lower_vals) > 0 else []
|
||
if lower_set:
|
||
lower_plot = min(lower_set)
|
||
|
||
if self.col_upper and self.col_upper in df_one_test.columns:
|
||
upper_vals = self._clean_and_convert_series(df_one_test[self.col_upper], 'numeric').dropna().unique()
|
||
upper_set = sorted(upper_vals.tolist()) if len(upper_vals) > 0 else []
|
||
if upper_set:
|
||
upper_plot = max(upper_set)
|
||
|
||
return lower_plot, upper_plot, lower_set, upper_set
|
||
|
||
@staticmethod
|
||
def _clean_and_convert_series(series: pd.Series, target_type: str = 'numeric') -> pd.Series:
|
||
"""统一的系列清洗和转换方法 - 修复了 ast 方法名错误"""
|
||
if series.empty:
|
||
return series
|
||
|
||
if target_type == 'numeric':
|
||
# 数值转换优化
|
||
if pd.api.types.is_numeric_dtype(series):
|
||
return series.astype(float)
|
||
|
||
# 批量字符串处理 - 修复这里的问题
|
||
cleaned = series.astype(str).str.replace(r'[, ]', '', regex=True).str.strip()
|
||
return pd.to_numeric(cleaned, errors='coerce')
|
||
|
||
elif target_type == 'datetime':
|
||
return MultiFileTestReportScatterPlotter._convert_to_datetime(series)
|
||
|
||
return series
|
||
|
||
@staticmethod
|
||
def _convert_to_datetime(series: pd.Series) -> pd.Series:
|
||
"""优化的日期时间转换"""
|
||
if pd.api.types.is_datetime64_any_dtype(series):
|
||
return series
|
||
|
||
# 预处理:转换为数值和字符串两种形式
|
||
numeric_series = pd.to_numeric(series, errors='coerce')
|
||
string_series = series.astype(str).str.strip()
|
||
|
||
result = pd.Series(pd.NaT, index=series.index, dtype='datetime64[ns]')
|
||
|
||
# 数值时间戳处理
|
||
masks = {
|
||
'ms': numeric_series >= 1e11,
|
||
's': (numeric_series >= 1e9) & (numeric_series < 1e11),
|
||
'excel': (numeric_series > 20000) & (numeric_series < 60000)
|
||
}
|
||
|
||
for mask_type, mask in masks.items():
|
||
if mask.any():
|
||
if mask_type == 'ms':
|
||
result.loc[mask] = pd.to_datetime(numeric_series.loc[mask], unit='ms')
|
||
elif mask_type == 's':
|
||
result.loc[mask] = pd.to_datetime(numeric_series.loc[mask], unit='s')
|
||
elif mask_type == 'excel':
|
||
origin = pd.Timestamp('1899-12-30')
|
||
result.loc[mask] = origin + pd.to_timedelta(numeric_series.loc[mask], unit='D')
|
||
|
||
# 字符串日期处理
|
||
remaining_mask = result.isna()
|
||
if remaining_mask.any():
|
||
remaining_strings = string_series.loc[remaining_mask]
|
||
|
||
# 特定格式优先处理
|
||
format_patterns = [
|
||
(r'^\d{4}-\d{2}-\d{2} \d{2}-\d{2}-\d{2}$', '%Y-%m-%d %H-%M-%S'),
|
||
]
|
||
|
||
for pattern, date_format in format_patterns:
|
||
format_mask = remaining_strings.str.match(pattern)
|
||
if format_mask.any():
|
||
result.loc[remaining_mask[remaining_mask].index[format_mask]] = pd.to_datetime(
|
||
remaining_strings.loc[format_mask], format=date_format, errors='coerce'
|
||
)
|
||
|
||
# 通用解析
|
||
still_na_mask = result.isna() & remaining_mask
|
||
if still_na_mask.any():
|
||
result.loc[still_na_mask] = pd.to_datetime(
|
||
string_series.loc[still_na_mask], errors='coerce'
|
||
)
|
||
|
||
return result
|
||
|
||
def _preprocess_test_data(self, test_data: pd.DataFrame) -> pd.DataFrame:
|
||
"""数据预处理"""
|
||
# 数值转换
|
||
test_data['Measurement_num'] = self._clean_and_convert_series(
|
||
test_data['Measurement'], 'numeric'
|
||
)
|
||
test_data['TestTime_dt'] = self._clean_and_convert_series(
|
||
test_data['Test Time'], 'datetime'
|
||
)
|
||
|
||
# 去除无效数据
|
||
valid_data = test_data.dropna(subset=['Measurement_num', 'TestTime_dt'])
|
||
return valid_data.sort_values('TestTime_dt')
|
||
|
||
def _calculate_statistics(self, y_data: pd.Series) -> Dict[str, float]:
|
||
"""计算统计信息"""
|
||
stats = {
|
||
'count': len(y_data),
|
||
'mean': y_data.mean(),
|
||
'median': y_data.median(),
|
||
'min': y_data.min(),
|
||
'max': y_data.max(),
|
||
'std': y_data.std(),
|
||
'q1': y_data.quantile(0.25),
|
||
'q3': y_data.quantile(0.75)
|
||
}
|
||
return stats
|
||
|
||
def _plot_to_base64(self, fig) -> str:
|
||
"""将图表转换为base64编码"""
|
||
buf = BytesIO()
|
||
fig.savefig(buf, format='png', dpi=150, bbox_inches='tight')
|
||
buf.seek(0)
|
||
img_str = base64.b64encode(buf.read()).decode('utf-8')
|
||
plt.close(fig)
|
||
return img_str
|
||
|
||
def _create_summary_plot(self, test_data: pd.DataFrame, test_name: str,
|
||
lower_plot: Optional[float], upper_plot: Optional[float]) -> str:
|
||
"""创建汇总图(所有SN在一个图中)"""
|
||
fig, ax = plt.subplots(figsize=(12, 8))
|
||
|
||
# 分组绘制
|
||
groups = list(test_data.groupby("SN")) if "SN" in test_data.columns else [("Unknown_SN", test_data)]
|
||
for sn, group in groups:
|
||
ax.scatter(group['TestTime_dt'], group['Measurement_num'],
|
||
label=str(sn), alpha=0.7, s=25)
|
||
|
||
# 计算统计信息
|
||
y_data = test_data['Measurement_num']
|
||
stats = self._calculate_statistics(y_data)
|
||
|
||
# 绘制限值线和统计线
|
||
x_min, x_max = test_data['TestTime_dt'].min(), test_data['TestTime_dt'].max()
|
||
|
||
if lower_plot is not None:
|
||
ax.axhline(y=lower_plot, color='green', linestyle='--', linewidth=1.2, label="Lower Limit")
|
||
if upper_plot is not None:
|
||
ax.axhline(y=upper_plot, color='red', linestyle='--', linewidth=1.2, label="Upper Limit")
|
||
|
||
# 添加统计线
|
||
ax.hlines(y=stats['mean'], xmin=x_min, xmax=x_max, colors='orange',
|
||
linestyles='-', linewidth=1.5, alpha=0.7, label='Mean')
|
||
ax.hlines(y=stats['median'], xmin=x_min, xmax=x_max, colors='purple',
|
||
linestyles='-.', linewidth=1.5, alpha=0.7, label='Median')
|
||
|
||
# 设置图形属性
|
||
ax.set_title(f"汇总图 - {test_name}")
|
||
ax.set_xlabel("Test Time")
|
||
ax.set_ylabel("Measurement Value")
|
||
ax.grid(True, alpha=0.3)
|
||
ax.tick_params(axis='x', rotation=45)
|
||
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
|
||
|
||
return self._plot_to_base64(fig)
|
||
|
||
def _create_sn_plots(self, test_data: pd.DataFrame, test_name: str,
|
||
lower_plot: Optional[float], upper_plot: Optional[float]) -> List[Dict[str, str]]:
|
||
"""为每个SN创建独立图表"""
|
||
sn_plots = []
|
||
|
||
if "SN" not in test_data.columns:
|
||
return sn_plots
|
||
|
||
sn_groups = test_data.groupby("SN")
|
||
|
||
for sn, group in sn_groups:
|
||
if group.empty:
|
||
continue
|
||
|
||
fig, ax = plt.subplots(figsize=(10, 6))
|
||
|
||
# 绘制当前SN的数据点
|
||
ax.scatter(group['TestTime_dt'], group['Measurement_num'],
|
||
color='blue', alpha=0.7, s=30, label=f"SN: {sn}")
|
||
|
||
# 计算当前SN的统计信息
|
||
y_data = group['Measurement_num']
|
||
stats = self._calculate_statistics(y_data)
|
||
|
||
# 绘制限值线
|
||
x_min, x_max = group['TestTime_dt'].min(), group['TestTime_dt'].max()
|
||
|
||
if lower_plot is not None:
|
||
ax.axhline(y=lower_plot, color='green', linestyle='--', linewidth=1.2, label="Lower Limit")
|
||
if upper_plot is not None:
|
||
ax.axhline(y=upper_plot, color='red', linestyle='--', linewidth=1.2, label="Upper Limit")
|
||
|
||
# 添加统计线
|
||
ax.hlines(y=stats['mean'], xmin=x_min, xmax=x_max, colors='orange',
|
||
linestyles='-', linewidth=1.5, alpha=0.7, label='Mean')
|
||
ax.hlines(y=stats['median'], xmin=x_min, xmax=x_max, colors='purple',
|
||
linestyles='-.', linewidth=1.5, alpha=0.7, label='Median')
|
||
|
||
# 设置图形属性
|
||
ax.set_title(f"SN独立图 - {test_name} (SN: {sn})")
|
||
ax.set_xlabel("Test Time")
|
||
ax.set_ylabel("Measurement Value")
|
||
ax.grid(True, alpha=0.3)
|
||
ax.tick_params(axis='x', rotation=45)
|
||
ax.legend()
|
||
|
||
# 转换为base64
|
||
plot_image = self._plot_to_base64(fig)
|
||
sn_plots.append({"sn": str(sn), "image": plot_image})
|
||
|
||
return sn_plots
|
||
|
||
def _determine_test_status(self, stats: Dict[str, float],
|
||
lower_limit: Optional[float],
|
||
upper_limit: Optional[float]) -> Dict[str, Any]:
|
||
"""确定测试状态"""
|
||
status = "success"
|
||
status_display = "正常"
|
||
|
||
if lower_limit is not None and upper_limit is not None:
|
||
# 检查是否超出限值
|
||
if stats['min'] < lower_limit or stats['max'] > upper_limit:
|
||
status = "danger"
|
||
status_display = "异常"
|
||
elif (stats['mean'] < lower_limit * 1.1 or stats['mean'] > upper_limit * 0.9 or
|
||
stats['std'] > (upper_limit - lower_limit) * 0.2):
|
||
status = "warning"
|
||
status_display = "警告"
|
||
|
||
return {"status": status, "status_display": status_display}
|
||
|
||
def generate_html_report(self, filtered_df: pd.DataFrame, keyword: str,
|
||
unique_tests: List[str]) -> None:
|
||
"""生成HTML报告"""
|
||
self._print_stage("生成HTML报告")
|
||
start_time = time.time()
|
||
|
||
test_results = []
|
||
total_points = 0
|
||
status_counts = {"success": 0, "warning": 0, "danger": 0}
|
||
|
||
for i, test_name in enumerate(unique_tests, 1):
|
||
self._print_progress(i, len(unique_tests), "生成测试报告")
|
||
|
||
# 获取测试数据
|
||
test_data = filtered_df[filtered_df["Test Name New"] == test_name].copy()
|
||
test_data = self._preprocess_test_data(test_data)
|
||
|
||
if test_data.empty:
|
||
continue
|
||
|
||
# 提取限值信息
|
||
lower_plot, upper_plot, _, _ = self._extract_limits(test_data)
|
||
|
||
# 计算统计信息
|
||
y_data = test_data['Measurement_num']
|
||
stats = self._calculate_statistics(y_data)
|
||
total_points += stats['count']
|
||
|
||
# 生成汇总图表
|
||
summary_plot_image = self._create_summary_plot(test_data, test_name, lower_plot, upper_plot)
|
||
|
||
# 生成SN独立图表
|
||
sn_plot_images = self._create_sn_plots(test_data, test_name, lower_plot, upper_plot)
|
||
|
||
# 确定测试状态
|
||
status_info = self._determine_test_status(stats, lower_plot, upper_plot)
|
||
status_counts[status_info["status"]] += 1
|
||
|
||
# 添加到结果列表
|
||
test_results.append({
|
||
"name": test_name,
|
||
"stats": stats,
|
||
"limits": {"lower": lower_plot, "upper": upper_plot},
|
||
"summary_plot_image": summary_plot_image,
|
||
"sn_plot_images": sn_plot_images,
|
||
"status": status_info["status"],
|
||
"status_display": status_info["status_display"]
|
||
})
|
||
|
||
# 渲染HTML模板
|
||
template = Template(HTML_TEMPLATE)
|
||
html_content = template.render(
|
||
keyword=keyword if keyword else "所有数据",
|
||
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
test_count=len(test_results),
|
||
total_points=total_points,
|
||
tests=test_results,
|
||
folder_path=self.folder_path,
|
||
analysis_time=round(time.time() - start_time, 2),
|
||
status_counts={"normal": status_counts["success"], "warning": status_counts["warning"],
|
||
"abnormal": status_counts["danger"]},
|
||
file_count=len(self.file_infos),
|
||
file_infos=self.file_infos,
|
||
total_rows=len(self.df) if self.df is not None else 0
|
||
)
|
||
|
||
# 保存HTML文件
|
||
with open(self.html_report_path, 'w', encoding='utf-8') as f:
|
||
f.write(html_content)
|
||
|
||
self._print_success(f"HTML报告已生成: {self.html_report_path}")
|
||
self._print_success(
|
||
f"共处理 {len(self.file_infos)} 个文件,{len(test_results)} 个测试项,{total_points} 个数据点")
|
||
|
||
|
||
def run(self) -> None:
|
||
"""运行主程序"""
|
||
try:
|
||
self.get_folder_path()
|
||
excel_files = self.find_excel_files()
|
||
|
||
if not excel_files:
|
||
self._print_error("没有找到可用的Excel文件")
|
||
return
|
||
|
||
# 使用优化后的加载方法
|
||
self.load_multiple_files_optimized(excel_files)
|
||
|
||
while True:
|
||
filtered_df, keyword, unique_tests = self.get_keyword()
|
||
if filtered_df.empty:
|
||
self._print_warning("没有数据可处理,退出程序")
|
||
break
|
||
|
||
self.create_output_dir(keyword)
|
||
self.generate_html_report(filtered_df, keyword, unique_tests)
|
||
|
||
self._print_success("分析完成!")
|
||
print(f"📊 报告文件: {self.html_report_path}")
|
||
print(f"📁 输出目录: {self.output_dir}")
|
||
|
||
# 询问是否继续分析其他关键词
|
||
continue_choice = input("\n是否继续分析其他关键词?(y/n): ").strip().lower()
|
||
if continue_choice not in ['y', 'yes', '是']:
|
||
break
|
||
|
||
except KeyboardInterrupt:
|
||
self._print_warning("用户中断程序")
|
||
except Exception as e:
|
||
self._print_error(f"发生错误: {type(e).__name__}: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
plotter = MultiFileTestReportScatterPlotter()
|
||
plotter.run()
|