c50ccd258f
refactor(tests): 将e2e_tests迁移到tests_suite和api_integration_tests style: 为Java类添加文档注释 docs: 更新.gitignore和配置文件 test: 添加性能测试和Playwright测试脚本 chore: 清理旧测试文件和配置
65 lines
2.5 KiB
Python
65 lines
2.5 KiB
Python
"""
|
|
审计日志API封装
|
|
"""
|
|
|
|
from typing import Dict, Any
|
|
from httpx import AsyncClient
|
|
|
|
|
|
class SysLogAPI:
|
|
"""审计日志API"""
|
|
|
|
def __init__(self, client: AsyncClient):
|
|
self.client = client
|
|
self.base_path = "/api/logs"
|
|
|
|
async def get_login_logs(self) -> Any:
|
|
"""获取所有登录日志"""
|
|
return await self.client.get(f"{self.base_path}/login")
|
|
|
|
async def get_login_log_by_id(self, log_id: int) -> Any:
|
|
"""根据ID获取登录日志"""
|
|
return await self.client.get(f"{self.base_path}/login/{log_id}")
|
|
|
|
async def create_login_log(self, data: Dict[str, Any]) -> Any:
|
|
"""创建登录日志"""
|
|
return await self.client.post(f"{self.base_path}/login", json=data)
|
|
|
|
async def get_exception_logs(self) -> Any:
|
|
"""获取所有异常日志"""
|
|
return await self.client.get(f"{self.base_path}/exception")
|
|
|
|
async def get_exception_log_by_id(self, log_id: int) -> Any:
|
|
"""根据ID获取异常日志"""
|
|
return await self.client.get(f"{self.base_path}/exception/{log_id}")
|
|
|
|
async def create_exception_log(self, data: Dict[str, Any]) -> Any:
|
|
"""创建异常日志"""
|
|
return await self.client.post(f"{self.base_path}/exception", json=data)
|
|
|
|
async def get_login_logs_by_page(self, page: int = 0, size: int = 10,
|
|
sort: str = "id", order: str = "asc",
|
|
keyword: str = None) -> Any:
|
|
"""分页获取登录日志"""
|
|
params = {"page": page, "size": size, "sort": sort, "order": order}
|
|
if keyword:
|
|
params["keyword"] = keyword
|
|
return await self.client.get(f"{self.base_path}/login/page", params=params)
|
|
|
|
async def get_operation_logs_by_page(self, page: int = 0, size: int = 10,
|
|
sort: str = "id", order: str = "asc",
|
|
keyword: str = None) -> Any:
|
|
"""分页获取操作日志"""
|
|
params = {"page": page, "size": size, "sort": sort, "order": order}
|
|
if keyword:
|
|
params["keyword"] = keyword
|
|
return await self.client.get(f"{self.base_path}/operation/page", params=params)
|
|
|
|
async def get_login_log_count(self) -> Any:
|
|
"""获取登录日志总数"""
|
|
return await self.client.get(f"{self.base_path}/login/count")
|
|
|
|
async def get_operation_log_count(self) -> Any:
|
|
"""获取操作日志总数"""
|
|
return await self.client.get(f"{self.base_path}/operation/count")
|