feat: 重构测试框架并优化代码结构

refactor(tests): 将e2e_tests迁移到tests_suite和api_integration_tests
style: 为Java类添加文档注释
docs: 更新.gitignore和配置文件
test: 添加性能测试和Playwright测试脚本
chore: 清理旧测试文件和配置
This commit is contained in:
张翔
2026-03-14 13:49:39 +08:00
parent 9e187f42e5
commit c50ccd258f
178 changed files with 8655 additions and 2519 deletions
+3
View File
@@ -0,0 +1,3 @@
"""
工具模块
"""
@@ -0,0 +1 @@
"""API模块"""
@@ -0,0 +1,64 @@
"""
审计日志API封装
"""
from typing import Dict, Any
from httpx import AsyncClient
class SysLogAPI:
"""审计日志API"""
def __init__(self, client: AsyncClient):
self.client = client
self.base_path = "/api/logs"
async def get_login_logs(self) -> Any:
"""获取所有登录日志"""
return await self.client.get(f"{self.base_path}/login")
async def get_login_log_by_id(self, log_id: int) -> Any:
"""根据ID获取登录日志"""
return await self.client.get(f"{self.base_path}/login/{log_id}")
async def create_login_log(self, data: Dict[str, Any]) -> Any:
"""创建登录日志"""
return await self.client.post(f"{self.base_path}/login", json=data)
async def get_exception_logs(self) -> Any:
"""获取所有异常日志"""
return await self.client.get(f"{self.base_path}/exception")
async def get_exception_log_by_id(self, log_id: int) -> Any:
"""根据ID获取异常日志"""
return await self.client.get(f"{self.base_path}/exception/{log_id}")
async def create_exception_log(self, data: Dict[str, Any]) -> Any:
"""创建异常日志"""
return await self.client.post(f"{self.base_path}/exception", json=data)
async def get_login_logs_by_page(self, page: int = 0, size: int = 10,
sort: str = "id", order: str = "asc",
keyword: str = None) -> Any:
"""分页获取登录日志"""
params = {"page": page, "size": size, "sort": sort, "order": order}
if keyword:
params["keyword"] = keyword
return await self.client.get(f"{self.base_path}/login/page", params=params)
async def get_operation_logs_by_page(self, page: int = 0, size: int = 10,
sort: str = "id", order: str = "asc",
keyword: str = None) -> Any:
"""分页获取操作日志"""
params = {"page": page, "size": size, "sort": sort, "order": order}
if keyword:
params["keyword"] = keyword
return await self.client.get(f"{self.base_path}/operation/page", params=params)
async def get_login_log_count(self) -> Any:
"""获取登录日志总数"""
return await self.client.get(f"{self.base_path}/login/count")
async def get_operation_log_count(self) -> Any:
"""获取操作日志总数"""
return await self.client.get(f"{self.base_path}/operation/count")
@@ -0,0 +1,33 @@
"""
认证API
"""
from typing import Dict, Any
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class AuthAPI(BaseAPI):
"""认证API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/auth")
async def login(self, username: str, password: str) -> Response:
"""用户登录"""
return await self.post("/login", json={
"username": username,
"password": password
})
async def refresh_token(self, refresh_token: str) -> Response:
"""刷新token"""
return await self.post("/refresh", json={
"refreshToken": refresh_token
})
async def logout(self, token: str) -> Response:
"""用户登出"""
return await self.post("/logout", headers={
"Authorization": f"Bearer {token}"
})
@@ -0,0 +1,58 @@
"""
基础API类
"""
from typing import Optional, Dict, Any
from httpx import AsyncClient, Response
from loguru import logger
class BaseAPI:
"""基础API类"""
def __init__(self, client: AsyncClient, base_url: str = ""):
self.client = client
self.base_url = base_url
async def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs) -> Response:
"""GET请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"GET {url} - Params: {params}")
response = await self.client.get(url, params=params, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None, **kwargs) -> Response:
"""POST请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"POST {url} - Data: {data} - JSON: {json}")
response = await self.client.post(url, data=data, json=json, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def put(self, endpoint: str, data: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None, **kwargs) -> Response:
"""PUT请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"PUT {url} - Data: {data} - JSON: {json}")
response = await self.client.put(url, data=data, json=json, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def delete(self, endpoint: str, **kwargs) -> Response:
"""DELETE请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"DELETE {url}")
response = await self.client.delete(url, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def assert_status_code(self, response: Response, expected_status: int):
"""断言状态码"""
assert response.status_code == expected_status, f"Expected {expected_status}, got {response.status_code}. Response: {response.text}"
async def assert_response_contains(self, response: Response, key: str, value: Any = None):
"""断言响应包含指定字段"""
data = response.json()
assert key in data, f"Response does not contain key '{key}'"
if value is not None:
assert data[key] == value, f"Expected {value}, got {data[key]}"
@@ -0,0 +1,38 @@
"""
系统配置API封装
"""
from typing import Dict, Any
from httpx import AsyncClient
class SysConfigAPI:
"""系统参数配置API"""
def __init__(self, client: AsyncClient):
self.client = client
self.base_path = "/api/config"
async def get_all(self) -> Any:
"""获取所有配置"""
return await self.client.get(self.base_path)
async def get_by_key(self, config_key: str) -> Any:
"""根据key获取配置"""
return await self.client.get(f"{self.base_path}/key/{config_key}")
async def create(self, data: Dict[str, Any]) -> Any:
"""创建配置"""
return await self.client.post(self.base_path, json=data)
async def update(self, config_id: int, data: Dict[str, Any]) -> Any:
"""更新配置"""
return await self.client.put(f"{self.base_path}/{config_id}", json=data)
async def delete(self, config_id: int) -> Any:
"""删除配置"""
return await self.client.delete(f"{self.base_path}/{config_id}")
async def refresh_cache(self) -> Any:
"""刷新缓存"""
return await self.client.post(f"{self.base_path}/refresh")
@@ -0,0 +1,66 @@
"""
字典管理API封装
"""
from typing import Dict, Any, Optional
from httpx import AsyncClient
class DictTypeAPI:
"""字典类型API"""
def __init__(self, client: AsyncClient):
self.client = client
self.base_path = "/api/dict/types"
async def get_all(self) -> Any:
"""获取所有字典类型"""
return await self.client.get(self.base_path)
async def get_by_id(self, dict_id: int) -> Any:
"""根据ID获取字典类型"""
return await self.client.get(f"{self.base_path}/{dict_id}")
async def create(self, data: Dict[str, Any]) -> Any:
"""创建字典类型"""
return await self.client.post(self.base_path, json=data)
async def update(self, dict_id: int, data: Dict[str, Any]) -> Any:
"""更新字典类型"""
return await self.client.put(f"{self.base_path}/{dict_id}", json=data)
async def delete(self, dict_id: int) -> Any:
"""删除字典类型"""
return await self.client.delete(f"{self.base_path}/{dict_id}")
class DictDataAPI:
"""字典数据API"""
def __init__(self, client: AsyncClient):
self.client = client
self.base_path = "/api/dict/data"
async def get_all(self) -> Any:
"""获取所有字典数据"""
return await self.client.get(self.base_path)
async def get_by_id(self, data_id: int) -> Any:
"""根据ID获取字典数据"""
return await self.client.get(f"{self.base_path}/{data_id}")
async def get_by_type(self, dict_type: str) -> Any:
"""根据字典类型获取字典数据"""
return await self.client.get(f"{self.base_path}/type/{dict_type}")
async def create(self, data: Dict[str, Any]) -> Any:
"""创建字典数据"""
return await self.client.post(self.base_path, json=data)
async def update(self, data_id: int, data: Dict[str, Any]) -> Any:
"""更新字典数据"""
return await self.client.put(f"{self.base_path}/{data_id}", json=data)
async def delete(self, data_id: int) -> Any:
"""删除字典数据"""
return await self.client.delete(f"{self.base_path}/{data_id}")
@@ -0,0 +1,42 @@
"""
字典管理API
"""
from typing import Dict, Any
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class DictionaryAPI(BaseAPI):
"""字典管理API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/dictionaries")
async def create_dictionary(self, dict_data: Dict[str, Any]) -> Response:
"""创建字典"""
return await self.post("", json=dict_data)
async def get_dictionary_by_id(self, dict_id: int) -> Response:
"""根据ID获取字典"""
return await self.get(f"/{dict_id}")
async def get_dictionaries_by_type(self, dict_type: str) -> Response:
"""根据类型获取字典"""
return await self.get(f"/type/{dict_type}")
async def get_all_dictionaries(self) -> Response:
"""获取所有字典"""
return await self.get("")
async def update_dictionary(self, dict_id: int, dict_data: Dict[str, Any]) -> Response:
"""更新字典"""
return await self.put(f"/{dict_id}", json=dict_data)
async def delete_dictionary(self, dict_id: int) -> Response:
"""删除字典"""
return await self.delete(f"/{dict_id}")
async def check_type_and_code_exists(self, dict_type: str, code: str) -> Response:
"""检查类型和编码是否存在"""
return await self.get("/check/exists", params={"type": dict_type, "code": code})
@@ -0,0 +1,41 @@
"""
文件管理API封装
"""
from typing import Dict, Any
from httpx import AsyncClient
class SysFileAPI:
"""文件管理API"""
def __init__(self, client: AsyncClient):
self.client = client
self.base_path = "/api/files"
async def get_all(self) -> Any:
"""获取所有文件"""
return await self.client.get(self.base_path)
async def get_by_id(self, file_id: int) -> Any:
"""根据ID获取文件信息"""
return await self.client.get(f"{self.base_path}/{file_id}")
async def upload(self, file_path: str, create_by: str = "test") -> Any:
"""上传文件"""
with open(file_path, "rb") as f:
files = {"file": f}
data = {"createBy": create_by}
return await self.client.post(f"{self.base_path}/upload", files=files, data=data)
async def download(self, file_name: str) -> Any:
"""下载文件"""
return await self.client.get(f"{self.base_path}/download/{file_name}")
async def preview(self, file_name: str) -> Any:
"""预览文件"""
return await self.client.get(f"{self.base_path}/preview/{file_name}")
async def delete(self, file_id: int) -> Any:
"""删除文件"""
return await self.client.delete(f"{self.base_path}/{file_id}")
@@ -0,0 +1,46 @@
"""
菜单管理API
"""
from typing import Dict, Any, List
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class MenuAPI(BaseAPI):
"""菜单管理API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/menus")
async def create_menu(self, menu_data: Dict[str, Any]) -> Response:
"""创建菜单"""
return await self.post("", json=menu_data)
async def get_menu_by_id(self, menu_id: int) -> Response:
"""根据ID获取菜单"""
return await self.get(f"/{menu_id}")
async def get_all_menus(self) -> Response:
"""获取所有菜单"""
return await self.get("")
async def get_menu_tree(self) -> Response:
"""获取菜单树"""
return await self.get("/tree")
async def update_menu(self, menu_id: int, menu_data: Dict[str, Any]) -> Response:
"""更新菜单"""
return await self.put(f"/{menu_id}", json=menu_data)
async def delete_menu(self, menu_id: int) -> Response:
"""删除菜单"""
return await self.delete(f"/{menu_id}")
async def get_menus_by_parent(self, parent_id: int) -> Response:
"""根据父菜单ID获取子菜单"""
return await self.get("", params={"parentId": parent_id})
async def get_menus_by_type(self, menu_type: str) -> Response:
"""根据菜单类型获取菜单"""
return await self.get("", params={"menuType": menu_type})
@@ -0,0 +1,70 @@
"""
通知公告API封装
"""
from typing import Dict, Any
from httpx import AsyncClient
class SysNoticeAPI:
"""系统公告API"""
def __init__(self, client: AsyncClient):
self.client = client
self.base_path = "/api/notices"
async def get_all(self) -> Any:
"""获取所有公告"""
return await self.client.get(self.base_path)
async def get_by_id(self, notice_id: int) -> Any:
"""根据ID获取公告"""
return await self.client.get(f"{self.base_path}/{notice_id}")
async def get_by_status(self, status: str) -> Any:
"""根据状态获取公告"""
return await self.client.get(f"{self.base_path}/status/{status}")
async def create(self, data: Dict[str, Any]) -> Any:
"""创建公告"""
return await self.client.post(self.base_path, json=data)
async def update(self, notice_id: int, data: Dict[str, Any]) -> Any:
"""更新公告"""
return await self.client.put(f"{self.base_path}/{notice_id}", json=data)
async def delete(self, notice_id: int) -> Any:
"""删除公告"""
return await self.client.delete(f"{self.base_path}/{notice_id}")
class SysMessageAPI:
"""用户消息API"""
def __init__(self, client: AsyncClient):
self.client = client
self.base_path = "/api/messages"
async def get_by_user(self, user_id: int) -> Any:
"""获取用户所有消息"""
return await self.client.get(f"{self.base_path}/user/{user_id}")
async def get_unread_count(self, user_id: int) -> Any:
"""获取未读消息数量"""
return await self.client.get(f"{self.base_path}/user/{user_id}/unread")
async def get_unread_list(self, user_id: int) -> Any:
"""获取未读消息列表"""
return await self.client.get(f"{self.base_path}/user/{user_id}/unread/list")
async def create(self, data: Dict[str, Any]) -> Any:
"""创建消息"""
return await self.client.post(self.base_path, json=data)
async def mark_as_read(self, message_id: int) -> Any:
"""标记消息为已读"""
return await self.client.put(f"{self.base_path}/{message_id}/read")
async def delete(self, message_id: int) -> Any:
"""删除消息"""
return await self.client.delete(f"{self.base_path}/{message_id}")
@@ -0,0 +1,59 @@
"""
角色管理API
"""
from typing import Dict, Any, List
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class RoleAPI(BaseAPI):
"""角色管理API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/roles")
async def create_role(self, role_data: Dict[str, Any]) -> Response:
"""创建角色"""
return await self.post("", json=role_data)
async def get_role_by_id(self, role_id: int) -> Response:
"""根据ID获取角色"""
return await self.get(f"/{role_id}")
async def get_role_by_name(self, role_name: str) -> Response:
"""根据名称获取角色"""
return await self.get(f"/name/{role_name}")
async def get_all_roles(self, include_deleted: bool = False) -> Response:
"""获取所有角色"""
return await self.get("", params={"includeDeleted": include_deleted})
async def update_role(self, role_id: int, role_data: Dict[str, Any]) -> Response:
"""更新角色"""
return await self.put(f"/{role_id}", json=role_data)
async def delete_role(self, role_id: int) -> Response:
"""删除角色(逻辑删除)"""
return await self.delete(f"/{role_id}")
async def restore_role(self, role_id: int) -> Response:
"""恢复角色"""
return await self.post(f"/{role_id}/restore")
async def check_name_exists(self, role_name: str) -> Response:
"""检查角色名是否存在"""
return await self.get("/check-name", params={"name": role_name})
async def get_roles_by_page(self, page: int = 0, size: int = 10,
sort: str = "id", order: str = "asc",
keyword: str = None) -> Response:
"""分页获取角色"""
params = {"page": page, "size": size, "sort": sort, "order": order}
if keyword:
params["keyword"] = keyword
return await self.get("/page", params=params)
async def get_role_count(self) -> Response:
"""获取角色总数"""
return await self.get("/count")
@@ -0,0 +1,71 @@
"""
用户管理API
"""
from typing import Dict, Any, List
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class UserAPI(BaseAPI):
"""用户管理API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/users")
async def create_user(self, user_data: Dict[str, Any]) -> Response:
"""创建用户"""
return await self.post("", json=user_data)
async def get_user_by_id(self, user_id: int) -> Response:
"""根据ID获取用户"""
return await self.get(f"/{user_id}")
async def get_all_users(self, include_deleted: bool = False) -> Response:
"""获取所有用户"""
return await self.get("", params={"includeDeleted": include_deleted})
async def update_user(self, user_id: int, user_data: Dict[str, Any]) -> Response:
"""更新用户"""
return await self.put(f"/{user_id}", json=user_data)
async def delete_user(self, user_id: int) -> Response:
"""删除用户"""
return await self.delete(f"/{user_id}")
async def logical_delete_user(self, user_id: int) -> Response:
"""逻辑删除用户"""
return await self.delete(f"/{user_id}/logical")
async def logical_delete_users(self, user_ids: List[int]) -> Response:
"""批量逻辑删除用户"""
return await self.post("/logical-delete", json=user_ids)
async def restore_user(self, user_id: int) -> Response:
"""恢复用户"""
return await self.post(f"/{user_id}/restore")
async def restore_users(self, user_ids: List[int]) -> Response:
"""批量恢复用户"""
return await self.post("/restore", json=user_ids)
async def check_username_exists(self, username: str) -> Response:
"""检查用户名是否存在"""
return await self.get("/check/username", params={"username": username})
async def check_email_exists(self, email: str) -> Response:
"""检查邮箱是否存在"""
return await self.get("/check/email", params={"email": email})
async def get_users_by_page(self, page: int = 0, size: int = 10,
sort: str = "id", order: str = "asc",
keyword: str = None) -> Response:
"""分页获取用户"""
params = {"page": page, "size": size, "sort": sort, "order": order}
if keyword:
params["keyword"] = keyword
return await self.get("/page", params=params)
async def get_user_count(self) -> Response:
"""获取用户总数"""
return await self.get("/count")
+83
View File
@@ -0,0 +1,83 @@
"""
断言工具
"""
from typing import Any, Dict, List
from httpx import Response
class Assertions:
"""断言工具类"""
@staticmethod
def assert_status_code(response: Response, expected_status: int):
"""断言状态码"""
assert response.status_code == expected_status, \
f"Expected status code {expected_status}, got {response.status_code}. Response: {response.text}"
@staticmethod
def assert_response_contains(response: Response, key: str, value: Any = None):
"""断言响应包含指定字段"""
data = response.json()
assert key in data, f"Response does not contain key '{key}'. Response: {data}"
if value is not None:
assert data[key] == value, \
f"Expected {value} for key '{key}', got {data[key]}"
@staticmethod
def assert_response_is_list(response: Response):
"""断言响应是列表"""
data = response.json()
assert isinstance(data, list), f"Expected list, got {type(data)}. Response: {data}"
@staticmethod
def assert_response_not_empty(response: Response):
"""断言响应不为空"""
data = response.json()
assert data, f"Response is empty. Response: {data}"
@staticmethod
def assert_response_field_type(response: Response, field: str, expected_type: type):
"""断言响应字段类型"""
data = response.json()
assert field in data, f"Response does not contain field '{field}'"
assert isinstance(data[field], expected_type), \
f"Expected field '{field}' to be {expected_type}, got {type(data[field])}"
@staticmethod
def assert_response_fields_present(response: Response, fields: List[str]):
"""断言响应包含所有指定字段"""
data = response.json()
missing_fields = [field for field in fields if field not in data]
assert not missing_fields, \
f"Response is missing fields: {missing_fields}. Response: {data}"
@staticmethod
def assert_response_field_length(response: Response, field: str, min_length: int = None, max_length: int = None):
"""断言响应字段长度"""
data = response.json()
assert field in data, f"Response does not contain field '{field}'"
field_value = data[field]
if isinstance(field_value, (str, list, dict)):
length = len(field_value)
if min_length is not None:
assert length >= min_length, \
f"Field '{field}' length {length} is less than minimum {min_length}"
if max_length is not None:
assert length <= max_length, \
f"Field '{field}' length {length} is greater than maximum {max_length}"
else:
raise AssertionError(f"Field '{field}' is not a string, list, or dict")
@staticmethod
def assert_error_response(response: Response, expected_message: str = None):
"""断言错误响应"""
Assertions.assert_status_code(response, 400)
if expected_message:
data = response.json()
assert expected_message in str(data), \
f"Expected error message '{expected_message}' not found in response: {data}"
assertions = Assertions()
+72
View File
@@ -0,0 +1,72 @@
"""
测试数据生成器
"""
import random
import string
from faker import Faker
class DataGenerator:
"""测试数据生成器"""
def __init__(self, locale: str = "zh_CN"):
self.faker = Faker(locale)
def generate_username(self) -> str:
"""生成用户名"""
return f"testuser_{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
def generate_password(self, length: int = 12) -> str:
"""生成密码"""
chars = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(random.choices(chars, k=length))
def generate_email(self) -> str:
"""生成邮箱"""
return self.faker.email()
def generate_phone(self) -> str:
"""生成手机号"""
return self.faker.phone_number()
def generate_name(self) -> str:
"""生成姓名"""
return self.faker.name()
def generate_role_name(self) -> str:
"""生成角色名"""
return f"ROLE_{''.join(random.choices(string.ascii_uppercase, k=6))}"
def generate_dict_type(self) -> str:
"""生成字典类型"""
return f"DICT_TYPE_{''.join(random.choices(string.ascii_uppercase, k=4))}"
def generate_dict_code(self) -> str:
"""生成字典编码"""
return f"CODE_{''.join(random.choices(string.ascii_uppercase + string.digits, k=6))}"
def generate_url(self) -> str:
"""生成URL"""
return self.faker.url()
def generate_company_name(self) -> str:
"""生成公司名"""
return self.faker.company()
def generate_address(self) -> str:
"""生成地址"""
return self.faker.address()
def generate_description(self) -> str:
"""生成描述"""
return self.faker.text(max_nb_chars=200)
def generate_permissions(self) -> str:
"""生成权限字符串"""
permissions = ["READ", "WRITE", "DELETE", "ADMIN", "MANAGE"]
selected = random.sample(permissions, random.randint(1, len(permissions)))
return ",".join(selected)
data_generator = DataGenerator()
@@ -0,0 +1,204 @@
"""
测试数据管理工具(简化版)
"""
import asyncio
from typing import List, Dict, Any, Callable
from httpx import AsyncClient
from loguru import logger
class TestDataManager:
"""测试数据管理器"""
def __init__(self, client: AsyncClient):
self.client = client
self._users: List[int] = []
self._roles: List[int] = []
self._menus: List[int] = []
self._dictionaries: List[int] = []
self._dict_types: List[int] = []
self._configs: List[int] = []
self._notices: List[int] = []
self._files: List[int] = []
self._messages: List[int] = []
def add_user(self, user_id: int):
"""添加用户到清理列表"""
self._users.append(user_id)
def add_role(self, role_id: int):
"""添加角色到清理列表"""
self._roles.append(role_id)
def add_menu(self, menu_id: int):
"""添加菜单到清理列表"""
self._menus.append(menu_id)
def add_dictionary(self, dict_id: int):
"""添加字典到清理列表"""
self._dictionaries.append(dict_id)
def add_dict_type(self, dict_type_id: int):
"""添加字典类型到清理列表"""
self._dict_types.append(dict_type_id)
def add_config(self, config_id: int):
"""添加系统配置到清理列表"""
self._configs.append(config_id)
def add_notice(self, notice_id: int):
"""添加系统公告到清理列表"""
self._notices.append(notice_id)
def add_file(self, file_id: int):
"""添加文件到清理列表"""
self._files.append(file_id)
def add_message(self, message_id: int):
"""添加消息到清理列表"""
self._messages.append(message_id)
async def cleanup_all(self):
"""清理所有测试数据"""
logger.info("Starting test data cleanup...")
cleanup_tasks = []
if self._messages:
cleanup_tasks.extend([self._delete_message(msg_id) for msg_id in self._messages])
self._messages.clear()
if self._files:
cleanup_tasks.extend([self._delete_file(file_id) for file_id in self._files])
self._files.clear()
if self._notices:
cleanup_tasks.extend([self._delete_notice(notice_id) for notice_id in self._notices])
self._notices.clear()
if self._configs:
cleanup_tasks.extend([self._delete_config(config_id) for config_id in self._configs])
self._configs.clear()
if self._dictionaries:
cleanup_tasks.extend([self._delete_dictionary(dict_id) for dict_id in self._dictionaries])
self._dictionaries.clear()
if self._dict_types:
cleanup_tasks.extend([self._delete_dict_type(dict_type_id) for dict_type_id in self._dict_types])
self._dict_types.clear()
if self._users:
cleanup_tasks.extend([self._delete_user(user_id) for user_id in self._users])
self._users.clear()
if self._roles:
cleanup_tasks.extend([self._delete_role(role_id) for role_id in self._roles])
self._roles.clear()
if self._menus:
cleanup_tasks.extend([self._delete_menu(menu_id) for menu_id in self._menus])
self._menus.clear()
if cleanup_tasks:
results = await asyncio.gather(*cleanup_tasks, return_exceptions=True)
failed_count = sum(1 for r in results if isinstance(r, Exception))
if failed_count > 0:
logger.warning(f"Failed to cleanup {failed_count} resources")
logger.info("Test data cleanup completed")
async def _delete_user(self, user_id: int):
"""删除用户"""
try:
await self.client.delete(f"/api/users/{user_id}")
logger.info(f"Cleaned up user {user_id}")
except Exception as e:
logger.warning(f"Failed to cleanup user {user_id}: {e}")
async def _delete_role(self, role_id: int):
"""删除角色"""
try:
await self.client.delete(f"/api/roles/{role_id}")
logger.info(f"Cleaned up role {role_id}")
except Exception as e:
logger.warning(f"Failed to cleanup role {role_id}: {e}")
async def _delete_menu(self, menu_id: int):
"""删除菜单"""
try:
await self.client.delete(f"/api/menus/{menu_id}")
logger.info(f"Cleaned up menu {menu_id}")
except Exception as e:
logger.warning(f"Failed to cleanup menu {menu_id}: {e}")
async def _delete_dictionary(self, dict_id: int):
"""删除字典"""
try:
await self.client.delete(f"/api/dictionaries/{dict_id}")
logger.info(f"Cleaned up dictionary {dict_id}")
except Exception as e:
logger.warning(f"Failed to cleanup dictionary {dict_id}: {e}")
async def _delete_dict_type(self, dict_type_id: int):
"""删除字典类型"""
try:
await self.client.delete(f"/api/dict/types/{dict_type_id}")
logger.info(f"Cleaned up dict type {dict_type_id}")
except Exception as e:
logger.warning(f"Failed to cleanup dict type {dict_type_id}: {e}")
async def _delete_config(self, config_id: int):
"""删除系统配置"""
try:
await self.client.delete(f"/api/config/{config_id}")
logger.info(f"Cleaned up config {config_id}")
except Exception as e:
logger.warning(f"Failed to cleanup config {config_id}: {e}")
async def _delete_notice(self, notice_id: int):
"""删除系统公告"""
try:
await self.client.delete(f"/api/notices/{notice_id}")
logger.info(f"Cleaned up notice {notice_id}")
except Exception as e:
logger.warning(f"Failed to cleanup notice {notice_id}: {e}")
async def _delete_file(self, file_id: int):
"""删除文件"""
try:
await self.client.delete(f"/api/files/{file_id}")
logger.info(f"Cleaned up file {file_id}")
except Exception as e:
logger.warning(f"Failed to cleanup file {file_id}: {e}")
async def _delete_message(self, message_id: int):
"""删除消息"""
try:
await self.client.delete(f"/api/messages/{message_id}")
logger.info(f"Cleaned up message {message_id}")
except Exception as e:
logger.warning(f"Failed to cleanup message {message_id}: {e}")
def get_stats(self) -> Dict[str, int]:
"""获取统计信息"""
return {
"users": len(self._users),
"roles": len(self._roles),
"menus": len(self._menus),
"dictionaries": len(self._dictionaries),
"dict_types": len(self._dict_types),
"configs": len(self._configs),
"notices": len(self._notices),
"files": len(self._files),
"messages": len(self._messages)
}
def has_data(self) -> bool:
"""检查是否有待清理数据"""
return any([
self._users, self._roles, self._menus,
self._dictionaries, self._dict_types, self._configs,
self._notices, self._files, self._messages
])
+33
View File
@@ -0,0 +1,33 @@
"""
日志工具
"""
import sys
from loguru import logger
from pathlib import Path
def setup_logger(log_file: str = "e2e_tests.log", log_level: str = "INFO"):
"""配置日志"""
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level=log_level,
colorize=True
)
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level=log_level,
rotation="10 MB",
retention="7 days",
compression="zip"
)
return logger
setup_logger()
+204
View File
@@ -0,0 +1,204 @@
"""
测试数据管理工具(简化版)
"""
import asyncio
from typing import List, Dict, Any, Callable
from httpx import AsyncClient
from loguru import logger
class TestDataManager:
"""测试数据管理器"""
def __init__(self, client: AsyncClient):
self.client = client
self._users: List[int] = []
self._roles: List[int] = []
self._menus: List[int] = []
self._dictionaries: List[int] = []
self._dict_types: List[int] = []
self._configs: List[int] = []
self._notices: List[int] = []
self._files: List[int] = []
self._messages: List[int] = []
def add_user(self, user_id: int):
"""添加用户到清理列表"""
self._users.append(user_id)
def add_role(self, role_id: int):
"""添加角色到清理列表"""
self._roles.append(role_id)
def add_menu(self, menu_id: int):
"""添加菜单到清理列表"""
self._menus.append(menu_id)
def add_dictionary(self, dict_id: int):
"""添加字典到清理列表"""
self._dictionaries.append(dict_id)
def add_dict_type(self, dict_type_id: int):
"""添加字典类型到清理列表"""
self._dict_types.append(dict_type_id)
def add_config(self, config_id: int):
"""添加系统配置到清理列表"""
self._configs.append(config_id)
def add_notice(self, notice_id: int):
"""添加系统公告到清理列表"""
self._notices.append(notice_id)
def add_file(self, file_id: int):
"""添加文件到清理列表"""
self._files.append(file_id)
def add_message(self, message_id: int):
"""添加消息到清理列表"""
self._messages.append(message_id)
async def cleanup_all(self):
"""清理所有测试数据"""
logger.info("Starting test data cleanup...")
cleanup_tasks = []
if self._messages:
cleanup_tasks.extend([self._delete_message(msg_id) for msg_id in self._messages])
self._messages.clear()
if self._files:
cleanup_tasks.extend([self._delete_file(file_id) for file_id in self._files])
self._files.clear()
if self._notices:
cleanup_tasks.extend([self._delete_notice(notice_id) for notice_id in self._notices])
self._notices.clear()
if self._configs:
cleanup_tasks.extend([self._delete_config(config_id) for config_id in self._configs])
self._configs.clear()
if self._dictionaries:
cleanup_tasks.extend([self._delete_dictionary(dict_id) for dict_id in self._dictionaries])
self._dictionaries.clear()
if self._dict_types:
cleanup_tasks.extend([self._delete_dict_type(dict_type_id) for dict_type_id in self._dict_types])
self._dict_types.clear()
if self._users:
cleanup_tasks.extend([self._delete_user(user_id) for user_id in self._users])
self._users.clear()
if self._roles:
cleanup_tasks.extend([self._delete_role(role_id) for role_id in self._roles])
self._roles.clear()
if self._menus:
cleanup_tasks.extend([self._delete_menu(menu_id) for menu_id in self._menus])
self._menus.clear()
if cleanup_tasks:
results = await asyncio.gather(*cleanup_tasks, return_exceptions=True)
failed_count = sum(1 for r in results if isinstance(r, Exception))
if failed_count > 0:
logger.warning(f"Failed to cleanup {failed_count} resources")
logger.info("Test data cleanup completed")
async def _delete_user(self, user_id: int):
"""删除用户"""
try:
await self.client.delete(f"/api/users/{user_id}")
logger.info(f"Cleaned up user {user_id}")
except Exception as e:
logger.warning(f"Failed to cleanup user {user_id}: {e}")
async def _delete_role(self, role_id: int):
"""删除角色"""
try:
await self.client.delete(f"/api/roles/{role_id}")
logger.info(f"Cleaned up role {role_id}")
except Exception as e:
logger.warning(f"Failed to cleanup role {role_id}: {e}")
async def _delete_menu(self, menu_id: int):
"""删除菜单"""
try:
await self.client.delete(f"/api/menus/{menu_id}")
logger.info(f"Cleaned up menu {menu_id}")
except Exception as e:
logger.warning(f"Failed to cleanup menu {menu_id}: {e}")
async def _delete_dictionary(self, dict_id: int):
"""删除字典"""
try:
await self.client.delete(f"/api/dictionaries/{dict_id}")
logger.info(f"Cleaned up dictionary {dict_id}")
except Exception as e:
logger.warning(f"Failed to cleanup dictionary {dict_id}: {e}")
async def _delete_dict_type(self, dict_type_id: int):
"""删除字典类型"""
try:
await self.client.delete(f"/api/dict/types/{dict_type_id}")
logger.info(f"Cleaned up dict type {dict_type_id}")
except Exception as e:
logger.warning(f"Failed to cleanup dict type {dict_type_id}: {e}")
async def _delete_config(self, config_id: int):
"""删除系统配置"""
try:
await self.client.delete(f"/api/config/{config_id}")
logger.info(f"Cleaned up config {config_id}")
except Exception as e:
logger.warning(f"Failed to cleanup config {config_id}: {e}")
async def _delete_notice(self, notice_id: int):
"""删除系统公告"""
try:
await self.client.delete(f"/api/notices/{notice_id}")
logger.info(f"Cleaned up notice {notice_id}")
except Exception as e:
logger.warning(f"Failed to cleanup notice {notice_id}: {e}")
async def _delete_file(self, file_id: int):
"""删除文件"""
try:
await self.client.delete(f"/api/files/{file_id}")
logger.info(f"Cleaned up file {file_id}")
except Exception as e:
logger.warning(f"Failed to cleanup file {file_id}: {e}")
async def _delete_message(self, message_id: int):
"""删除消息"""
try:
await self.client.delete(f"/api/messages/{message_id}")
logger.info(f"Cleaned up message {message_id}")
except Exception as e:
logger.warning(f"Failed to cleanup message {message_id}: {e}")
def get_stats(self) -> Dict[str, int]:
"""获取统计信息"""
return {
"users": len(self._users),
"roles": len(self._roles),
"menus": len(self._menus),
"dictionaries": len(self._dictionaries),
"dict_types": len(self._dict_types),
"configs": len(self._configs),
"notices": len(self._notices),
"files": len(self._files),
"messages": len(self._messages)
}
def has_data(self) -> bool:
"""检查是否有待清理数据"""
return any([
self._users, self._roles, self._menus,
self._dictionaries, self._dict_types, self._configs,
self._notices, self._files, self._messages
])