""" 错误诊断模块 提供详细的错误分析、分类、归因和恢复建议 """ import traceback import re from typing import Dict, Any, List, Optional, Tuple from dataclasses import dataclass from enum import Enum import json class ErrorCategory(Enum): """错误分类""" NETWORK_ERROR = "network_error" AUTH_ERROR = "auth_error" VALIDATION_ERROR = "validation_error" SERVER_ERROR = "server_error" TIMEOUT_ERROR = "timeout_error" DATA_ERROR = "data_error" CONFIG_ERROR = "config_error" UNKNOWN_ERROR = "unknown_error" class ErrorSeverity(Enum): """错误严重程度""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class ErrorAnalysis: """错误分析结果""" error_type: str error_category: ErrorCategory error_severity: ErrorSeverity error_message: str stack_trace: str context: Dict[str, Any] possible_causes: List[str] suggested_solutions: List[str] recovery_actions: List[str] related_tests: List[str] class ErrorPattern: """错误模式""" def __init__(self, pattern: str, category: ErrorCategory, severity: ErrorSeverity, causes: List[str], solutions: List[str], recovery: List[str]): """ 初始化错误模式 Args: pattern: 错误模式(正则表达式) category: 错误分类 severity: 错误严重程度 causes: 可能的原因 solutions: 建议的解决方案 recovery: 恢复操作 """ self.pattern = re.compile(pattern, re.IGNORECASE) self.category = category self.severity = severity self.causes = causes self.solutions = solutions self.recovery = recovery def matches(self, error_message: str) -> bool: """ 检查错误消息是否匹配此模式 Args: error_message: 错误消息 Returns: 是否匹配 """ return bool(self.pattern.search(error_message)) class ErrorDiagnoser: """错误诊断器""" def __init__(self): """初始化错误诊断器""" self.error_patterns = self._initialize_error_patterns() self.error_history: List[ErrorAnalysis] = [] def _initialize_error_patterns(self) -> List[ErrorPattern]: """ 初始化错误模式 Returns: 错误模式列表 """ patterns = [ # 网络错误 ErrorPattern( r"connection\s+(refused|timeout|reset|closed)", ErrorCategory.NETWORK_ERROR, ErrorSeverity.HIGH, [ "网络连接被拒绝", "服务器未启动或不可达", "防火墙阻止连接", "网络配置错误" ], [ "检查服务器是否正在运行", "验证网络连接", "检查防火墙设置", "确认服务器地址和端口正确" ], [ "重启服务器", "检查网络配置", "重试连接" ] ), ErrorPattern( r"host\s+(unreachable|not\s+found)", ErrorCategory.NETWORK_ERROR, ErrorSeverity.HIGH, [ "主机地址不存在", "DNS解析失败", "网络不可达" ], [ "验证主机地址正确", "检查DNS配置", "确认网络连接" ], [ "修正主机地址", "重试连接" ] ), # 认证错误 ErrorPattern( r"(unauthorized|authentication\s+failed|invalid\s+(token|credentials))", ErrorCategory.AUTH_ERROR, ErrorSeverity.HIGH, [ "认证令牌无效或过期", "用户名或密码错误", "权限不足" ], [ "检查认证令牌", "验证用户凭据", "确认用户权限" ], [ "重新登录", "刷新认证令牌", "联系管理员" ] ), ErrorPattern( r"(forbidden|access\s+denied)", ErrorCategory.AUTH_ERROR, ErrorSeverity.HIGH, [ "访问被拒绝", "权限不足", "资源不存在" ], [ "检查用户权限", "验证资源是否存在", "确认访问控制配置" ], [ "联系管理员", "申请相应权限" ] ), # 验证错误 ErrorPattern( r"(validation\s+failed|invalid\s+(parameter|input|data))", ErrorCategory.VALIDATION_ERROR, ErrorSeverity.MEDIUM, [ "输入数据格式错误", "参数验证失败", "数据不符合要求" ], [ "检查输入数据格式", "验证参数类型和范围", "参考API文档" ], [ "修正输入数据", "调整参数值" ] ), ErrorPattern( r"(required\s+field\s+missing|missing\s+required\s+parameter)", ErrorCategory.VALIDATION_ERROR, ErrorSeverity.MEDIUM, [ "缺少必填字段", "参数不完整" ], [ "检查请求参数", "确认必填字段", "参考API文档" ], [ "补充必填字段", "修正请求参数" ] ), # 服务器错误 ErrorPattern( r"(internal\s+server\s+error|server\s+error)", ErrorCategory.SERVER_ERROR, ErrorSeverity.CRITICAL, [ "服务器内部错误", "服务器处理异常", "服务器配置问题" ], [ "检查服务器日志", "验证服务器配置", "检查数据库连接" ], [ "联系技术支持", "检查服务器状态", "重启服务器" ] ), ErrorPattern( r"(database\s+(error|connection\s+failed|timeout))", ErrorCategory.SERVER_ERROR, ErrorSeverity.CRITICAL, [ "数据库连接失败", "数据库查询错误", "数据库超时" ], [ "检查数据库服务状态", "验证数据库连接配置", "检查SQL语句" ], [ "重启数据库服务", "修正连接配置", "优化SQL查询" ] ), # 超时错误 ErrorPattern( r"(request\s+timeout|operation\s+timed\s+out)", ErrorCategory.TIMEOUT_ERROR, ErrorSeverity.MEDIUM, [ "请求超时", "服务器响应慢", "网络延迟高" ], [ "检查网络连接", "增加超时时间", "优化请求" ], [ "重试请求", "增加超时配置", "优化网络环境" ] ), # 数据错误 ErrorPattern( r"(data\s+(not\s+found|does\s+not\s+exist)|record\s+not\s+found)", ErrorCategory.DATA_ERROR, ErrorSeverity.MEDIUM, [ "数据不存在", "记录未找到", "数据已被删除" ], [ "验证数据ID", "检查数据是否存在", "确认数据状态" ], [ "使用正确的数据ID", "重新创建数据" ] ), ErrorPattern( r"(duplicate\s+(key|entry|record)|constraint\s+violation)", ErrorCategory.DATA_ERROR, ErrorSeverity.MEDIUM, [ "数据重复", "唯一约束冲突", "数据已存在" ], [ "检查数据是否已存在", "验证唯一字段", "使用不同的值" ], [ "删除重复数据", "使用不同的值", "更新现有数据" ] ), # 配置错误 ErrorPattern( r"(configuration\s+error|invalid\s+configuration|config\s+not\s+found)", ErrorCategory.CONFIG_ERROR, ErrorSeverity.HIGH, [ "配置错误", "配置文件缺失", "配置参数无效" ], [ "检查配置文件", "验证配置参数", "参考配置文档" ], [ "修正配置文件", "重置配置", "重新加载配置" ] ) ] return patterns def diagnose_error(self, exception: Exception, context: Dict[str, Any] = None) -> ErrorAnalysis: """ 诊断错误 Args: exception: 异常对象 context: 上下文信息 Returns: 错误分析结果 """ error_message = str(exception) stack_trace = traceback.format_exc() # 查找匹配的错误模式 matched_pattern = None for pattern in self.error_patterns: if pattern.matches(error_message): matched_pattern = pattern break # 如果没有匹配的模式,使用默认分类 if matched_pattern is None: matched_pattern = ErrorPattern( r".*", ErrorCategory.UNKNOWN_ERROR, ErrorSeverity.MEDIUM, ["未知错误"], ["检查日志", "联系技术支持"], ["重试操作", "联系支持"] ) # 分析上下文 context = context or {} context.update({ "error_type": type(exception).__name__, "error_message": error_message, "timestamp": context.get("timestamp"), "test_name": context.get("test_name"), "url": context.get("url"), "method": context.get("method") }) # 创建错误分析结果 analysis = ErrorAnalysis( error_type=type(exception).__name__, error_category=matched_pattern.category, error_severity=matched_pattern.severity, error_message=error_message, stack_trace=stack_trace, context=context, possible_causes=matched_pattern.causes, suggested_solutions=matched_pattern.solutions, recovery_actions=matched_pattern.recovery, related_tests=self._find_related_tests(context) ) # 记录错误历史 self.error_history.append(analysis) return analysis def _find_related_tests(self, context: Dict[str, Any]) -> List[str]: """ 查找相关测试 Args: context: 上下文信息 Returns: 相关测试列表 """ related_tests = [] url = context.get("url", "") method = context.get("method", "") # 根据URL和方法查找相关测试 if "/auth/login" in url: related_tests.extend(["用户登录", "认证测试", "权限测试"]) elif "/user/" in url: related_tests.extend(["用户管理测试", "用户列表测试", "用户创建测试"]) elif "/role/" in url: related_tests.extend(["角色管理测试", "角色列表测试", "角色创建测试"]) elif "/menu/" in url: related_tests.extend(["菜单管理测试", "菜单列表测试"]) return related_tests def get_error_statistics(self) -> Dict[str, Any]: """ 获取错误统计信息 Returns: 错误统计信息 """ if not self.error_history: return {} # 按分类统计 category_stats = {} for analysis in self.error_history: category = analysis.error_category.value if category not in category_stats: category_stats[category] = 0 category_stats[category] += 1 # 按严重程度统计 severity_stats = {} for analysis in self.error_history: severity = analysis.error_severity.value if severity not in severity_stats: severity_stats[severity] = 0 severity_stats[severity] += 1 # 按错误类型统计 type_stats = {} for analysis in self.error_history: error_type = analysis.error_type if error_type not in type_stats: type_stats[error_type] = 0 type_stats[error_type] += 1 return { "total_errors": len(self.error_history), "by_category": category_stats, "by_severity": severity_stats, "by_type": type_stats, "most_common_errors": self._get_most_common_errors(5) } def _get_most_common_errors(self, limit: int = 5) -> List[Dict[str, Any]]: """ 获取最常见的错误 Args: limit: 返回数量限制 Returns: 最常见错误列表 """ error_counts = {} for analysis in self.error_history: error_key = f"{analysis.error_type}: {analysis.error_message[:50]}" if error_key not in error_counts: error_counts[error_key] = 0 error_counts[error_key] += 1 sorted_errors = sorted(error_counts.items(), key=lambda x: x[1], reverse=True) return [ { "error": error[0], "count": error[1] } for error in sorted_errors[:limit] ] def generate_error_report(self, analysis: ErrorAnalysis) -> str: """ 生成错误报告 Args: analysis: 错误分析结果 Returns: 错误报告(Markdown格式) """ report = f"""# 错误诊断报告 ## 错误信息 - **错误类型**: {analysis.error_type} - **错误分类**: {analysis.error_category.value} - **严重程度**: {analysis.error_severity.value} - **错误消息**: {analysis.error_message} ## 错误堆栈 ``` {analysis.stack_trace} ``` ## 上下文信息 {self._format_context(analysis.context)} ## 可能的原因 {self._format_list(analysis.possible_causes)} ## 建议的解决方案 {self._format_list(analysis.suggested_solutions)} ## 恢复操作 {self._format_list(analysis.recovery_actions)} ## 相关测试 {self._format_list(analysis.related_tests)} --- *报告生成时间: {self._get_current_timestamp()}* """ return report def _format_context(self, context: Dict[str, Any]) -> str: """格式化上下文信息""" lines = [] for key, value in context.items(): if value is not None: lines.append(f"- **{key}**: {value}") return "\n".join(lines) def _format_list(self, items: List[str]) -> str: """格式化列表""" return "\n".join([f"- {item}" for item in items]) def _get_current_timestamp(self) -> str: """获取当前时间戳""" from datetime import datetime return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def clear_history(self) -> None: """清空错误历史""" self.error_history.clear()