""" 审计日志模块 提供操作日志记录和JaVers风格的对象变更审计功能。 """ import json import time import uuid import functools from typing import Any, Dict, List, Optional, Callable from dataclasses import dataclass, field from datetime import datetime from abc import ABC, abstractmethod @dataclass class OperationLogEntry: """操作日志条目""" id: str operation_time: datetime module_name: str operation_desc: str operator: str operator_id: Optional[int] = None request_method: Optional[str] = None request_path: Optional[str] = None request_params: Optional[str] = None response_result: Optional[str] = None ip_address: Optional[str] = None execution_time: Optional[int] = None # 执行时间(毫秒) status: str = "SUCCESS" exception_message: Optional[str] = None diff_json: Optional[str] = None @dataclass class ObjectChange: """对象变更记录""" field_name: str old_value: Any new_value: Any @dataclass class DiffResult: """差异比较结果""" has_changes: bool changes: List[ObjectChange] @dataclass class AuditStatistics: """审计统计信息""" total_operations: int = 0 success_count: int = 0 failure_count: int = 0 module_distribution: Dict[str, int] = field(default_factory=dict) class AuditLogStorage(ABC): """审计日志存储抽象基类""" @abstractmethod def save(self, log_entry: Dict[str, Any]) -> None: """保存日志条目""" pass @abstractmethod def query( self, module_name: Optional[str] = None, operator: Optional[str] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, status: Optional[str] = None ) -> List[Dict[str, Any]]: """查询日志""" pass @abstractmethod def get_all(self) -> List[Dict[str, Any]]: """获取所有日志""" pass @abstractmethod def delete_old(self, max_keep: int) -> int: """删除旧日志""" pass class MemoryAuditStorage(AuditLogStorage): """内存审计日志存储""" def __init__(self): self._logs: List[Dict[str, Any]] = [] def save(self, log_entry: Dict[str, Any]) -> None: self._logs.append(log_entry) def query( self, module_name: Optional[str] = None, operator: Optional[str] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, status: Optional[str] = None ) -> List[Dict[str, Any]]: result = self._logs if module_name: result = [log for log in result if log.get("module_name") == module_name] if operator: result = [log for log in result if log.get("operator") == operator] if start_time: result = [log for log in result if log.get("timestamp", 0) >= start_time] if end_time: result = [log for log in result if log.get("timestamp", 0) <= end_time] if status: result = [log for log in result if log.get("status") == status] return result def get_all(self) -> List[Dict[str, Any]]: return self._logs.copy() def delete_old(self, max_keep: int) -> int: if len(self._logs) <= max_keep: return 0 # 按时间排序,保留最新的 sorted_logs = sorted(self._logs, key=lambda x: x.get("timestamp", 0), reverse=True) self._logs = sorted_logs[:max_keep] deleted_count = len(sorted_logs) - len(self._logs) return deleted_count class OperationLogRecorder: """操作日志记录器""" def __init__(self, storage: Optional[AuditLogStorage] = None): self._storage = storage or MemoryAuditStorage() def record( self, module_name: str, operation_desc: str, operator: str, operator_id: Optional[int] = None, request_method: Optional[str] = None, request_path: Optional[str] = None, request_params: Optional[str] = None, ip_address: Optional[str] = None, execution_time: Optional[int] = None, status: str = "SUCCESS", exception_message: Optional[str] = None, diff_json: Optional[str] = None ) -> OperationLogEntry: """记录操作日志""" entry = OperationLogEntry( id=str(uuid.uuid4()), operation_time=datetime.now(), module_name=module_name, operation_desc=operation_desc, operator=operator, operator_id=operator_id, request_method=request_method, request_path=request_path, request_params=request_params, ip_address=ip_address, execution_time=execution_time, status=status, exception_message=exception_message, diff_json=diff_json ) # 转换为字典并保存 log_dict = { "id": entry.id, "timestamp": time.time(), "module_name": entry.module_name, "operation_desc": entry.operation_desc, "operator": entry.operator, "operator_id": entry.operator_id, "request_method": entry.request_method, "request_path": entry.request_path, "request_params": entry.request_params, "ip_address": entry.ip_address, "execution_time": entry.execution_time, "status": entry.status, "exception_message": entry.exception_message, "diff_json": entry.diff_json, } self._storage.save(log_dict) return entry def query_logs( self, module_name: Optional[str] = None, operator: Optional[str] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, status: Optional[str] = None ) -> List[Dict[str, Any]]: """查询操作日志""" return self._storage.query( module_name=module_name, operator=operator, start_time=start_time, end_time=end_time, status=status ) def get_statistics(self) -> AuditStatistics: """获取统计信息""" logs = self._storage.get_all() stats = AuditStatistics() stats.total_operations = len(logs) for log in logs: status = log.get("status", "SUCCESS") if status == "SUCCESS": stats.success_count += 1 else: stats.failure_count += 1 module = log.get("module_name", "unknown") stats.module_distribution[module] = stats.module_distribution.get(module, 0) + 1 return stats def cleanup(self, max_keep: int = 1000) -> int: """清理旧日志""" return self._storage.delete_old(max_keep) class ObjectChangeAuditor: """对象变更审计器(JaVers风格)""" def compare(self, old_object: Dict[str, Any], new_object: Dict[str, Any]) -> DiffResult: """ 比较两个对象的差异 Args: old_object: 旧对象 new_object: 新对象 Returns: 差异结果 """ changes = [] # 获取所有字段 all_keys = set(old_object.keys()) | set(new_object.keys()) for key in all_keys: old_value = old_object.get(key) new_value = new_object.get(key) if old_value != new_value: changes.append(ObjectChange( field_name=key, old_value=old_value, new_value=new_value )) return DiffResult( has_changes=len(changes) > 0, changes=changes ) def get_changed_fields( self, old_object: Dict[str, Any], new_object: Dict[str, Any] ) -> List[ObjectChange]: """获取变更的字段列表""" diff_result = self.compare(old_object, new_object) return diff_result.changes def to_json(self, obj: Any) -> str: """将对象转换为JSON字符串""" return json.dumps(obj, ensure_ascii=False, default=str) class AuditLogExporter: """审计日志导出器""" def __init__(self, recorder: OperationLogRecorder): self._recorder = recorder def export_to_json( self, module_name: Optional[str] = None, operator: Optional[str] = None ) -> str: """导出为JSON格式""" logs = self._recorder.query_logs( module_name=module_name, operator=operator ) return json.dumps(logs, ensure_ascii=False, indent=2, default=str) def export_to_csv( self, module_name: Optional[str] = None, operator: Optional[str] = None ) -> str: """导出为CSV格式""" logs = self._recorder.query_logs( module_name=module_name, operator=operator ) if not logs: return "" # 获取表头 headers = ["timestamp", "module_name", "operation_desc", "operator", "status"] # 生成CSV lines = [",".join(headers)] for log in logs: values = [ str(log.get(h, "")) for h in headers ] lines.append(",".join(values)) return "\n".join(lines) class AuditLogRecorder: """统一的审计日志记录器""" def __init__(self, storage: Optional[AuditLogStorage] = None): self._operation_recorder = OperationLogRecorder(storage) self._change_auditor = ObjectChangeAuditor() def record_operation(self, **kwargs) -> OperationLogEntry: """记录操作日志""" return self._operation_recorder.record(**kwargs) def query_logs(self, **kwargs) -> List[Dict[str, Any]]: """查询日志""" return self._operation_recorder.query_logs(**kwargs) def record_change( self, old_object: Dict[str, Any], new_object: Dict[str, Any], **kwargs ) -> OperationLogEntry: """记录对象变更""" # 比较差异 diff_result = self._change_auditor.compare(old_object, new_object) # 生成差异JSON diff_json = json.dumps([ { "field": c.field_name, "old": c.old_value, "new": c.new_value } for c in diff_result.changes ], ensure_ascii=False) # 记录操作日志 return self._operation_recorder.record( diff_json=diff_json, **kwargs ) def query_logs(self, **kwargs) -> List[Dict[str, Any]]: """查询日志""" return self._operation_recorder.query_logs(**kwargs) def audit_log( recorder: AuditLogRecorder, module_name: str, operation_desc: str ): """ 审计日志装饰器 Args: recorder: 审计日志记录器 module_name: 模块名称 operation_desc: 操作描述 """ def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() status = "SUCCESS" exception_msg = None result = None try: result = func(*args, **kwargs) return result except Exception as e: status = "FAILURE" exception_msg = str(e) raise finally: execution_time = int((time.time() - start_time) * 1000) # 记录日志 recorder.record_operation( module_name=module_name, operation_desc=operation_desc, operator="system", # 可以从上下文获取 request_params=json.dumps({"args": args, "kwargs": kwargs}, default=str), execution_time=execution_time, status=status, exception_message=exception_msg ) return wrapper return decorator