Files
PythonApp/tempReportProcess/tempReportProcess_V1.py

283 lines
11 KiB
Python
Raw Normal View History

2026-02-02 15:19:30 +08:00
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import tkinter as tk
from tkinter import filedialog
import os
import matplotlib.dates as mdates
from jinja2 import Template
from matplotlib import font_manager, rcParams
class TemperatureDataAnalyzer:
def __init__(self):
self.data = None
self.file_path = None
self.timestamps = []
self.temperatures = []
self.statuses = []
self._configure_chinese_font() # 配置中文字体,修复中文字符缺失警告
def _configure_chinese_font(self):
"""
配置 Matplotlib 中文字体避免中文字符缺失的警告
会尝试常见的中文字体并设置 axes.unicode_minus False
"""
try:
# 常见中文字体候选(跨平台)
candidates = [
"Microsoft YaHei", "Microsoft YaHei UI", # Windows
"SimHei", "SimSun", # Windows黑体/宋体)
"PingFang SC", "Heiti SC", # macOS
2026-02-02 15:19:30 +08:00
"Noto Sans CJK SC", "Source Han Sans SC", "WenQuanYi Micro Hei", # Linux
"Arial Unicode MS" # 覆盖广的 Unicode 字体
2026-02-02 15:19:30 +08:00
]
available = {f.name for f in font_manager.fontManager.ttflist}
for name in candidates:
if name in available:
rcParams["font.sans-serif"] = [name]
rcParams["axes.unicode_minus"] = False
# 可选:打印使用的字体名称
# print(f"使用中文字体: {name}")
return
# 如果没有找到常见中文字体,给出提示
rcParams["axes.unicode_minus"] = False
print("未检测到常见中文字体,图中中文可能无法正常显示。建议安装 'Noto Sans CJK SC''Microsoft YaHei'")
except Exception as e:
print(f"中文字体配置失败: {e}")
def select_file(self):
"""手动选择CSV文件"""
root = tk.Tk()
root.withdraw() # 隐藏主窗口
file_types = [("CSV files", "*.csv"), ("All files", "*.*")]
self.file_path = filedialog.askopenfilename(title="选择温度数据CSV文件", filetypes=file_types)
if not self.file_path:
print("未选择文件,程序退出")
return False
return True
def load_and_process_data(self):
"""加载和处理数据,并保存带时间戳的新文件"""
2026-02-02 15:19:30 +08:00
try:
# 读取CSV文件无表头
self.data = pd.read_csv(self.file_path, header=None)
# 重命名列以便于引用
self.data.columns = ['timestamp', 'temperature', 'status']
# 转换时间戳格式文本例如10/29/2025 2:20:41 PM
self.data['datetime'] = pd.to_datetime(self.data['timestamp'], format='%m/%d/%Y %I:%M:%S %p')
# 将转换后的datetime对象存储到D列原数据只有3列所以新增第4列
self.data['converted_timestamp'] = self.data['datetime']
# 新增第5列存储转换后的时间戳 UTC的时间戳精确到ms
# self.data['utc_timestamp_ms'] = (self.data['datetime'].astype('int64') // 10**6)
self.data['utc_timestamp_ms'] = (self.data['datetime'].astype('int64') // 10**9)
# 保存带时间戳的新CSV文件
self._save_csv_with_timestamp()
2026-02-02 15:19:30 +08:00
# 提取处理后的数据
self.timestamps = self.data['datetime']
self.temperatures = self.data['temperature']
self.statuses = self.data['status']
print(f"成功加载 {len(self.data)} 条记录")
return True
except Exception as e:
print(f"数据处理错误: {e}")
return False
def _save_csv_with_timestamp(self):
"""保存带时间戳的新CSV文件"""
try:
# 生成新文件名(原文件名+时间戳)
base_filename = os.path.splitext(os.path.basename(self.file_path))[0]
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"{base_filename}_with_timestamp_{timestamp_str}.csv"
output_dir = os.path.dirname(self.file_path)
output_path = os.path.join(output_dir, output_filename)
# # 选择需要保存的列:原始三列 + 转换后的时间戳列
# columns_to_save = ['timestamp', 'temperature', 'status', 'converted_timestamp']
# 选择需要保存的列:原始三列 + 转换后的时间戳列 + 格式化时间戳列
columns_to_save = ['timestamp', 'temperature', 'status', 'converted_timestamp', 'utc_timestamp_ms']
self.data[columns_to_save].to_csv(output_path, index=False, header=False)
print(f"已保存带时间戳的新CSV文件: {output_path}")
except Exception as e:
print(f"保存带时间戳CSV文件时出错: {e}")
2026-02-02 15:19:30 +08:00
def create_scatter_plots(self):
"""创建散点图"""
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# 温度散点图
sc1 = ax1.scatter(self.timestamps, self.temperatures, c=self.temperatures,
cmap='coolwarm', alpha=0.7, s=20)
ax1.set_title('温度随时间变化趋势')
ax1.set_ylabel('温度 (°C)')
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax1.grid(True, linestyle='--', alpha=0.7)
ax1.tick_params(axis='x', rotation=45)
plt.colorbar(sc1, ax=ax1, label="温度(°C)")
# 状态散点图
sc2 = ax2.scatter(self.timestamps, self.statuses, c=self.statuses,
cmap='viridis', alpha=0.7, s=20)
ax2.set_title('状态随时间变化')
ax2.set_xlabel('时间')
ax2.set_ylabel('状态值')
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax2.grid(True, linestyle='--', alpha=0.7)
ax2.tick_params(axis='x', rotation=45)
plt.colorbar(sc2, ax=ax2, label="状态值")
plt.tight_layout()
return fig
def generate_statistics_report(self):
"""生成统计报告"""
stats = {
'total_records': len(self.temperatures),
'avg_temperature': round(self.temperatures.mean(), 2),
'max_temperature': round(self.temperatures.max(), 2),
'min_temperature': round(self.temperatures.min(), 2),
'std_deviation': round(self.temperatures.std(), 2),
'temp_range': round(self.temperatures.max() - self.temperatures.min(), 2),
'start_time': self.timestamps.iloc[0].strftime('%Y-%m-%d %H:%M:%S'),
'end_time': self.timestamps.iloc[-1].strftime('%Y-%m-%d %H:%M:%S'),
'duration_hours': round((self.timestamps.iloc[-1] - self.timestamps.iloc[0]).total_seconds() / 3600, 2)
}
# 状态分布统计
status_counts = self.statuses.value_counts().to_dict()
stats['status_distribution'] = status_counts
return stats
def save_fig_to_html(self, fig, output_path):
"""将图形保存为HTML"""
import io
import base64
# 将图形转换为base64编码
buf = io.BytesIO()
fig.savefig(buf, format='png', dpi=150, bbox_inches='tight')
buf.seek(0)
img_str = base64.b64encode(buf.read()).decode('utf-8')
buf.close()
# HTML模板修复了多余的 '}'
html_template = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>温度数据分析报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #f0f0f0; padding: 15px; border-radius: 5px; }
.section { margin-bottom: 30px; }
.stats-table { width: 100%; border-collapse: collapse; }
.stats-table th, .stats-table td { border: 1px solid #ddd; padding: 8px; text-align: left; }
.stats-table th { background-color: #f2f2f2; }
.image-container { text-align: center; margin: 20px 0; }
h1, h2 { color: #333; }
</style>
</head>
<body>
<div class="header">
<h1>温度数据分析报告</h1>
<p><strong>数据文件:</strong> {{ file_name }}</p>
<p><strong>生成时间:</strong> {{ generation_time }}</p>
</div>
<div class="section">
<h2>数据概览</h2>
<table class="stats-table">
<tr><th>项目</th><th>数值</th></tr>
{% for key, value in statistics.items() %}
{% if key != 'status_distribution' %}
<tr><td>{{ key.replace('_', ' ').title() }}</td><td>{{ value }}</td></tr>
{% endif %}
{% endfor %}
</table>
</div>
<div class="section">
<h2>状态分布</h2>
<table class="stats-table">
<tr><th>状态值</th><th>出现次数</th></tr>
{% for status, count in statistics.status_distribution.items() %}
<tr><td>{{ status }}</td><td>{{ count }}</td></tr>
{% endfor %}
</table>
</div>
<div class="section">
<h2>温度与状态时序图</h2>
<div class="image-container">
<img src="data:image/png;base64,{{ image_data }}" alt="温度与状态时序图">
</div>
</div>
</body>
</html>
"""
template = Template(html_template)
rendered_html = template.render(
file_name=self.file_path,
generation_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
statistics=self.generate_statistics_report(),
image_data=img_str
)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(rendered_html)
def run_analysis(self):
"""运行完整分析流程"""
if not self.select_file():
return
if not self.load_and_process_data():
return
# 创建图形
fig = self.create_scatter_plots()
# 生成输出文件名(保存到选择的文件所在文件夹)
base_filename = os.path.splitext(os.path.basename(self.file_path))[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"{base_filename}_{timestamp}.html"
output_dir = os.path.dirname(self.file_path)
output_path = os.path.join(output_dir, output_filename)
# 保存HTML报告到同一文件夹
self.save_fig_to_html(fig, output_path)
print(f"分析完成!报告已保存至: {output_path}")
# 显示统计摘要
stats = self.generate_statistics_report()
print("\n=== 数据统计摘要 ===")
for key, value in stats.items():
if key != 'status_distribution':
print(f"{key.replace('_', ' ').title()}: {value}")
if __name__ == "__main__":
analyzer = TemperatureDataAnalyzer()
analyzer.run_analysis()