Files
张翔 08ea5fbe98 feat(admin): 添加用户管理相关文件
添加用户管理视图、API和状态管理文件
2026-03-28 14:37:29 +08:00

428 lines
12 KiB
Python

"""
审计日志模块
提供操作日志记录和JaVers风格的对象变更审计功能。
"""
import json
import time
import uuid
import functools
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
@dataclass
class OperationLogEntry:
"""操作日志条目"""
id: str
operation_time: datetime
module_name: str
operation_desc: str
operator: str
operator_id: Optional[int] = None
request_method: Optional[str] = None
request_path: Optional[str] = None
request_params: Optional[str] = None
response_result: Optional[str] = None
ip_address: Optional[str] = None
execution_time: Optional[int] = None # 执行时间(毫秒)
status: str = "SUCCESS"
exception_message: Optional[str] = None
diff_json: Optional[str] = None
@dataclass
class ObjectChange:
"""对象变更记录"""
field_name: str
old_value: Any
new_value: Any
@dataclass
class DiffResult:
"""差异比较结果"""
has_changes: bool
changes: List[ObjectChange]
@dataclass
class AuditStatistics:
"""审计统计信息"""
total_operations: int = 0
success_count: int = 0
failure_count: int = 0
module_distribution: Dict[str, int] = field(default_factory=dict)
class AuditLogStorage(ABC):
"""审计日志存储抽象基类"""
@abstractmethod
def save(self, log_entry: Dict[str, Any]) -> None:
"""保存日志条目"""
pass
@abstractmethod
def query(
self,
module_name: Optional[str] = None,
operator: Optional[str] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
status: Optional[str] = None
) -> List[Dict[str, Any]]:
"""查询日志"""
pass
@abstractmethod
def get_all(self) -> List[Dict[str, Any]]:
"""获取所有日志"""
pass
@abstractmethod
def delete_old(self, max_keep: int) -> int:
"""删除旧日志"""
pass
class MemoryAuditStorage(AuditLogStorage):
"""内存审计日志存储"""
def __init__(self):
self._logs: List[Dict[str, Any]] = []
def save(self, log_entry: Dict[str, Any]) -> None:
self._logs.append(log_entry)
def query(
self,
module_name: Optional[str] = None,
operator: Optional[str] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
status: Optional[str] = None
) -> List[Dict[str, Any]]:
result = self._logs
if module_name:
result = [log for log in result if log.get("module_name") == module_name]
if operator:
result = [log for log in result if log.get("operator") == operator]
if start_time:
result = [log for log in result if log.get("timestamp", 0) >= start_time]
if end_time:
result = [log for log in result if log.get("timestamp", 0) <= end_time]
if status:
result = [log for log in result if log.get("status") == status]
return result
def get_all(self) -> List[Dict[str, Any]]:
return self._logs.copy()
def delete_old(self, max_keep: int) -> int:
if len(self._logs) <= max_keep:
return 0
# 按时间排序,保留最新的
sorted_logs = sorted(self._logs, key=lambda x: x.get("timestamp", 0), reverse=True)
self._logs = sorted_logs[:max_keep]
deleted_count = len(sorted_logs) - len(self._logs)
return deleted_count
class OperationLogRecorder:
"""操作日志记录器"""
def __init__(self, storage: Optional[AuditLogStorage] = None):
self._storage = storage or MemoryAuditStorage()
def record(
self,
module_name: str,
operation_desc: str,
operator: str,
operator_id: Optional[int] = None,
request_method: Optional[str] = None,
request_path: Optional[str] = None,
request_params: Optional[str] = None,
ip_address: Optional[str] = None,
execution_time: Optional[int] = None,
status: str = "SUCCESS",
exception_message: Optional[str] = None,
diff_json: Optional[str] = None
) -> OperationLogEntry:
"""记录操作日志"""
entry = OperationLogEntry(
id=str(uuid.uuid4()),
operation_time=datetime.now(),
module_name=module_name,
operation_desc=operation_desc,
operator=operator,
operator_id=operator_id,
request_method=request_method,
request_path=request_path,
request_params=request_params,
ip_address=ip_address,
execution_time=execution_time,
status=status,
exception_message=exception_message,
diff_json=diff_json
)
# 转换为字典并保存
log_dict = {
"id": entry.id,
"timestamp": time.time(),
"module_name": entry.module_name,
"operation_desc": entry.operation_desc,
"operator": entry.operator,
"operator_id": entry.operator_id,
"request_method": entry.request_method,
"request_path": entry.request_path,
"request_params": entry.request_params,
"ip_address": entry.ip_address,
"execution_time": entry.execution_time,
"status": entry.status,
"exception_message": entry.exception_message,
"diff_json": entry.diff_json,
}
self._storage.save(log_dict)
return entry
def query_logs(
self,
module_name: Optional[str] = None,
operator: Optional[str] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
status: Optional[str] = None
) -> List[Dict[str, Any]]:
"""查询操作日志"""
return self._storage.query(
module_name=module_name,
operator=operator,
start_time=start_time,
end_time=end_time,
status=status
)
def get_statistics(self) -> AuditStatistics:
"""获取统计信息"""
logs = self._storage.get_all()
stats = AuditStatistics()
stats.total_operations = len(logs)
for log in logs:
status = log.get("status", "SUCCESS")
if status == "SUCCESS":
stats.success_count += 1
else:
stats.failure_count += 1
module = log.get("module_name", "unknown")
stats.module_distribution[module] = stats.module_distribution.get(module, 0) + 1
return stats
def cleanup(self, max_keep: int = 1000) -> int:
"""清理旧日志"""
return self._storage.delete_old(max_keep)
class ObjectChangeAuditor:
"""对象变更审计器(JaVers风格)"""
def compare(self, old_object: Dict[str, Any], new_object: Dict[str, Any]) -> DiffResult:
"""
比较两个对象的差异
Args:
old_object: 旧对象
new_object: 新对象
Returns:
差异结果
"""
changes = []
# 获取所有字段
all_keys = set(old_object.keys()) | set(new_object.keys())
for key in all_keys:
old_value = old_object.get(key)
new_value = new_object.get(key)
if old_value != new_value:
changes.append(ObjectChange(
field_name=key,
old_value=old_value,
new_value=new_value
))
return DiffResult(
has_changes=len(changes) > 0,
changes=changes
)
def get_changed_fields(
self,
old_object: Dict[str, Any],
new_object: Dict[str, Any]
) -> List[ObjectChange]:
"""获取变更的字段列表"""
diff_result = self.compare(old_object, new_object)
return diff_result.changes
def to_json(self, obj: Any) -> str:
"""将对象转换为JSON字符串"""
return json.dumps(obj, ensure_ascii=False, default=str)
class AuditLogExporter:
"""审计日志导出器"""
def __init__(self, recorder: OperationLogRecorder):
self._recorder = recorder
def export_to_json(
self,
module_name: Optional[str] = None,
operator: Optional[str] = None
) -> str:
"""导出为JSON格式"""
logs = self._recorder.query_logs(
module_name=module_name,
operator=operator
)
return json.dumps(logs, ensure_ascii=False, indent=2, default=str)
def export_to_csv(
self,
module_name: Optional[str] = None,
operator: Optional[str] = None
) -> str:
"""导出为CSV格式"""
logs = self._recorder.query_logs(
module_name=module_name,
operator=operator
)
if not logs:
return ""
# 获取表头
headers = ["timestamp", "module_name", "operation_desc", "operator", "status"]
# 生成CSV
lines = [",".join(headers)]
for log in logs:
values = [
str(log.get(h, "")) for h in headers
]
lines.append(",".join(values))
return "\n".join(lines)
class AuditLogRecorder:
"""统一的审计日志记录器"""
def __init__(self, storage: Optional[AuditLogStorage] = None):
self._operation_recorder = OperationLogRecorder(storage)
self._change_auditor = ObjectChangeAuditor()
def record_operation(self, **kwargs) -> OperationLogEntry:
"""记录操作日志"""
return self._operation_recorder.record(**kwargs)
def query_logs(self, **kwargs) -> List[Dict[str, Any]]:
"""查询日志"""
return self._operation_recorder.query_logs(**kwargs)
def record_change(
self,
old_object: Dict[str, Any],
new_object: Dict[str, Any],
**kwargs
) -> OperationLogEntry:
"""记录对象变更"""
# 比较差异
diff_result = self._change_auditor.compare(old_object, new_object)
# 生成差异JSON
diff_json = json.dumps([
{
"field": c.field_name,
"old": c.old_value,
"new": c.new_value
}
for c in diff_result.changes
], ensure_ascii=False)
# 记录操作日志
return self._operation_recorder.record(
diff_json=diff_json,
**kwargs
)
def query_logs(self, **kwargs) -> List[Dict[str, Any]]:
"""查询日志"""
return self._operation_recorder.query_logs(**kwargs)
def audit_log(
recorder: AuditLogRecorder,
module_name: str,
operation_desc: str
):
"""
审计日志装饰器
Args:
recorder: 审计日志记录器
module_name: 模块名称
operation_desc: 操作描述
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
status = "SUCCESS"
exception_msg = None
result = None
try:
result = func(*args, **kwargs)
return result
except Exception as e:
status = "FAILURE"
exception_msg = str(e)
raise
finally:
execution_time = int((time.time() - start_time) * 1000)
# 记录日志
recorder.record_operation(
module_name=module_name,
operation_desc=operation_desc,
operator="system", # 可以从上下文获取
request_params=json.dumps({"args": args, "kwargs": kwargs}, default=str),
execution_time=execution_time,
status=status,
exception_message=exception_msg
)
return wrapper
return decorator