feat: 添加系统配置、审计中心、通知中心、文件管理模块

This commit is contained in:
张翔
2026-03-11 12:11:59 +08:00
commit 52c66444a5
264 changed files with 10109 additions and 0 deletions
+22
View File
@@ -0,0 +1,22 @@
# E2E测试环境配置
# API配置
API_BASE_URL=http://localhost:8080
# 数据库配置
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_NAME=manage_system
DATABASE_USERNAME=postgres
DATABASE_PASSWORD=postgres
# 测试用户凭证
TEST_USERNAME=admin
TEST_PASSWORD=admin123
# 浏览器配置
HEADLESS_BROWSER=true
BROWSER_TYPE=chromium
# 超时配置(毫秒)
REQUEST_TIMEOUT=30000
+55
View File
@@ -0,0 +1,55 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
ENV/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Pytest
.pytest_cache/
.coverage
htmlcov/
*.cover
.hypothesis/
# Allure
allure-results/
allure-report/
# Logs
*.log
# Environment variables
.env
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Playwright
.playwright/
+104
View File
@@ -0,0 +1,104 @@
# E2E测试项目
## 项目概述
本项目使用Python + Playwright框架对Novalon管理系统进行端到端测试。
## 技术栈
- **Python**: 3.9+
- **Playwright**: 1.40+
- **Pytest**: 7.0+
- **Allure**: 测试报告
## 项目结构
```
e2e_tests/
├── __init__.py
├── conftest.py # Pytest配置和fixtures
├── pytest.ini # Pytest配置文件
├── requirements.txt # Python依赖
├── config/
│ ├── __init__.py
│ └── settings.py # 配置管理
├── pages/
│ ├── __init__.py
│ ├── base_page.py # 基础页面类
│ ├── auth_page.py # 认证相关页面
│ ├── user_page.py # 用户管理页面
│ ├── role_page.py # 角色管理页面
│ └── dictionary_page.py # 字典管理页面
├── api/
│ ├── __init__.py
│ ├── base_api.py # 基础API类
│ ├── auth_api.py # 认证API
│ ├── user_api.py # 用户API
│ ├── role_api.py # 角色API
│ └── dictionary_api.py # 字典API
├── tests/
│ ├── __init__.py
│ ├── test_auth.py # 认证测试
│ ├── test_user.py # 用户管理测试
│ ├── test_role.py # 角色管理测试
│ ├── test_dictionary.py # 字典管理测试
│ └── test_oauth2.py # OAuth2测试
├── utils/
│ ├── __init__.py
│ ├── data_generator.py # 测试数据生成器
│ ├── assertions.py # 断言工具
│ └── logger.py # 日志工具
└── reports/ # 测试报告目录
```
## 安装依赖
```bash
# 安装Python依赖
pip install -r requirements.txt
# 安装Playwright浏览器
playwright install
```
## 运行测试
```bash
# 运行所有测试
pytest
# 运行特定测试文件
pytest tests/test_auth.py
# 运行特定测试用例
pytest tests/test_auth.py::test_login_success
# 生成Allure报告
pytest --alluredir=allure-results
allure serve allure-results
# 并发运行测试
pytest -n auto
```
## 配置说明
`config/settings.py` 中配置:
- API基础URL
- 测试数据库连接
- 测试用户凭证
- 超时设置
## 测试数据管理
测试数据自动准备和清理:
- 每个测试用例独立运行
- 使用fixture自动创建和清理测试数据
- 支持数据回滚
## 持续集成
测试可在CI/CD流程中自动运行:
- GitHub Actions
- GitLab CI
- Jenkins
+6
View File
@@ -0,0 +1,6 @@
"""
E2E测试项目 - Novalon管理系统
使用Playwright进行端到端测试
"""
__version__ = "1.0.0"
+1
View File
@@ -0,0 +1 @@
"""API模块"""
+33
View File
@@ -0,0 +1,33 @@
"""
认证API
"""
from typing import Dict, Any
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class AuthAPI(BaseAPI):
"""认证API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/auth")
async def login(self, username: str, password: str) -> Response:
"""用户登录"""
return await self.post("/login", json={
"username": username,
"password": password
})
async def refresh_token(self, refresh_token: str) -> Response:
"""刷新token"""
return await self.post("/refresh", json={
"refreshToken": refresh_token
})
async def logout(self, token: str) -> Response:
"""用户登出"""
return await self.post("/logout", headers={
"Authorization": f"Bearer {token}"
})
+58
View File
@@ -0,0 +1,58 @@
"""
基础API类
"""
from typing import Optional, Dict, Any
from httpx import AsyncClient, Response
from loguru import logger
class BaseAPI:
"""基础API类"""
def __init__(self, client: AsyncClient, base_url: str = ""):
self.client = client
self.base_url = base_url
async def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs) -> Response:
"""GET请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"GET {url} - Params: {params}")
response = await self.client.get(url, params=params, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None, **kwargs) -> Response:
"""POST请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"POST {url} - Data: {data} - JSON: {json}")
response = await self.client.post(url, data=data, json=json, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def put(self, endpoint: str, data: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None, **kwargs) -> Response:
"""PUT请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"PUT {url} - Data: {data} - JSON: {json}")
response = await self.client.put(url, data=data, json=json, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def delete(self, endpoint: str, **kwargs) -> Response:
"""DELETE请求"""
url = f"{self.base_url}{endpoint}"
logger.info(f"DELETE {url}")
response = await self.client.delete(url, **kwargs)
logger.info(f"Response: {response.status_code}")
return response
async def assert_status_code(self, response: Response, expected_status: int):
"""断言状态码"""
assert response.status_code == expected_status, f"Expected {expected_status}, got {response.status_code}. Response: {response.text}"
async def assert_response_contains(self, response: Response, key: str, value: Any = None):
"""断言响应包含指定字段"""
data = response.json()
assert key in data, f"Response does not contain key '{key}'"
if value is not None:
assert data[key] == value, f"Expected {value}, got {data[key]}"
+42
View File
@@ -0,0 +1,42 @@
"""
字典管理API
"""
from typing import Dict, Any
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class DictionaryAPI(BaseAPI):
"""字典管理API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/dictionaries")
async def create_dictionary(self, dict_data: Dict[str, Any]) -> Response:
"""创建字典"""
return await self.post("", json=dict_data)
async def get_dictionary_by_id(self, dict_id: int) -> Response:
"""根据ID获取字典"""
return await self.get(f"/{dict_id}")
async def get_dictionaries_by_type(self, dict_type: str) -> Response:
"""根据类型获取字典"""
return await self.get(f"/type/{dict_type}")
async def get_all_dictionaries(self) -> Response:
"""获取所有字典"""
return await self.get("")
async def update_dictionary(self, dict_id: int, dict_data: Dict[str, Any]) -> Response:
"""更新字典"""
return await self.put(f"/{dict_id}", json=dict_data)
async def delete_dictionary(self, dict_id: int) -> Response:
"""删除字典"""
return await self.delete(f"/{dict_id}")
async def check_type_and_code_exists(self, dict_type: str, code: str) -> Response:
"""检查类型和编码是否存在"""
return await self.get("/check/exists", params={"type": dict_type, "code": code})
+58
View File
@@ -0,0 +1,58 @@
"""
角色管理API
"""
from typing import Dict, Any, List
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class RoleAPI(BaseAPI):
"""角色管理API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/roles")
async def create_role(self, role_data: Dict[str, Any]) -> Response:
"""创建角色"""
return await self.post("", json=role_data)
async def get_role_by_id(self, role_id: int) -> Response:
"""根据ID获取角色"""
return await self.get(f"/{role_id}")
async def get_role_by_name(self, role_name: str) -> Response:
"""根据名称获取角色"""
return await self.get(f"/name/{role_name}")
async def get_all_roles(self, include_deleted: bool = False) -> Response:
"""获取所有角色"""
return await self.get("", params={"includeDeleted": include_deleted})
async def update_role(self, role_id: int, role_data: Dict[str, Any]) -> Response:
"""更新角色"""
return await self.put(f"/{role_id}", json=role_data)
async def delete_role(self, role_id: int) -> Response:
"""删除角色"""
return await self.delete(f"/{role_id}")
async def logical_delete_role(self, role_id: int) -> Response:
"""逻辑删除角色"""
return await self.delete(f"/{role_id}/logical")
async def logical_delete_roles(self, role_ids: List[int]) -> Response:
"""批量逻辑删除角色"""
return await self.post("/logical-delete", json=role_ids)
async def restore_role(self, role_id: int) -> Response:
"""恢复角色"""
return await self.post(f"/{role_id}/restore")
async def restore_roles(self, role_ids: List[int]) -> Response:
"""批量恢复角色"""
return await self.post("/restore", json=role_ids)
async def check_name_exists(self, role_name: str) -> Response:
"""检查角色名是否存在"""
return await self.get("/check/name", params={"name": role_name})
+58
View File
@@ -0,0 +1,58 @@
"""
用户管理API
"""
from typing import Dict, Any, List
from httpx import AsyncClient, Response
from .base_api import BaseAPI
class UserAPI(BaseAPI):
"""用户管理API"""
def __init__(self, client: AsyncClient):
super().__init__(client, "/api/users")
async def create_user(self, user_data: Dict[str, Any]) -> Response:
"""创建用户"""
return await self.post("", json=user_data)
async def get_user_by_id(self, user_id: int) -> Response:
"""根据ID获取用户"""
return await self.get(f"/{user_id}")
async def get_all_users(self, include_deleted: bool = False) -> Response:
"""获取所有用户"""
return await self.get("", params={"includeDeleted": include_deleted})
async def update_user(self, user_id: int, user_data: Dict[str, Any]) -> Response:
"""更新用户"""
return await self.put(f"/{user_id}", json=user_data)
async def delete_user(self, user_id: int) -> Response:
"""删除用户"""
return await self.delete(f"/{user_id}")
async def logical_delete_user(self, user_id: int) -> Response:
"""逻辑删除用户"""
return await self.delete(f"/{user_id}/logical")
async def logical_delete_users(self, user_ids: List[int]) -> Response:
"""批量逻辑删除用户"""
return await self.post("/logical-delete", json=user_ids)
async def restore_user(self, user_id: int) -> Response:
"""恢复用户"""
return await self.post(f"/{user_id}/restore")
async def restore_users(self, user_ids: List[int]) -> Response:
"""批量恢复用户"""
return await self.post("/restore", json=user_ids)
async def check_username_exists(self, username: str) -> Response:
"""检查用户名是否存在"""
return await self.get("/check/username", params={"username": username})
async def check_email_exists(self, email: str) -> Response:
"""检查邮箱是否存在"""
return await self.get("/check/email", params={"email": email})
+1
View File
@@ -0,0 +1 @@
"""配置模块"""
+74
View File
@@ -0,0 +1,74 @@
"""
配置管理模块
"""
import os
from typing import Optional
from pydantic_settings import BaseSettings
from pydantic import Field
class Settings(BaseSettings):
"""应用配置"""
API_BASE_URL: str = Field(
default="http://localhost:8080",
description="API基础URL"
)
DATABASE_HOST: str = Field(
default="localhost",
description="数据库主机"
)
DATABASE_PORT: int = Field(
default=55432,
description="数据库端口"
)
DATABASE_NAME: str = Field(
default="manage_system",
description="数据库名称"
)
DATABASE_USERNAME: str = Field(
default="postgres",
description="数据库用户名"
)
DATABASE_PASSWORD: str = Field(
default="postgres",
description="数据库密码"
)
TEST_USERNAME: str = Field(
default="admin",
description="测试用户名"
)
TEST_PASSWORD: str = Field(
default="admin123",
description="测试用户密码"
)
REQUEST_TIMEOUT: int = Field(
default=30000,
description="请求超时时间(毫秒)"
)
HEADLESS_BROWSER: bool = Field(
default=True,
description="无头浏览器模式"
)
BROWSER_TYPE: str = Field(
default="chromium",
description="浏览器类型"
)
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
+161
View File
@@ -0,0 +1,161 @@
"""
Pytest配置和fixtures
"""
import asyncio
import pytest
from typing import AsyncGenerator, Generator
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
from httpx import AsyncClient
from config.settings import settings
@pytest.fixture(scope="session")
def event_loop():
"""创建事件循环"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def browser() -> AsyncGenerator[Browser, None]:
"""浏览器fixture"""
async with async_playwright() as p:
browser = await p.launch(
headless=settings.HEADLESS_BROWSER,
browser_type=settings.BROWSER_TYPE
)
yield browser
await browser.close()
@pytest.fixture
async def context(browser: Browser) -> AsyncGenerator[BrowserContext, None]:
"""浏览器上下文fixture"""
context = await browser.new_context()
yield context
await context.close()
@pytest.fixture
async def page(context: BrowserContext) -> AsyncGenerator[Page, None]:
"""页面fixture"""
page = await context.new_page()
page.set_default_timeout(settings.REQUEST_TIMEOUT)
yield page
await page.close()
@pytest.fixture
async def http_client() -> AsyncGenerator[AsyncClient, None]:
"""HTTP客户端fixture"""
async with AsyncClient(
base_url=settings.API_BASE_URL,
timeout=settings.REQUEST_TIMEOUT / 1000
) as client:
yield client
@pytest.fixture
async def auth_token(http_client: AsyncClient) -> str:
"""获取认证token"""
response = await http_client.post(
"/api/auth/login",
json={
"username": settings.TEST_USERNAME,
"password": settings.TEST_PASSWORD
}
)
assert response.status_code == 200
data = response.json()
return data.get("accessToken")
@pytest.fixture
async def authenticated_client(http_client: AsyncClient, auth_token: str) -> AsyncClient:
"""已认证的HTTP客户端"""
http_client.headers.update({"Authorization": f"Bearer {auth_token}"})
return http_client
@pytest.fixture
def test_user_data():
"""测试用户数据"""
import time
timestamp = int(time.time() * 1000)
return {
"username": f"testuser_{timestamp}",
"password": "password123",
"email": f"test_{timestamp}@example.com",
"roleId": 2,
"status": 1
}
@pytest.fixture
def test_role_data():
"""测试角色数据"""
import time
timestamp = int(time.time() * 1000)
return {
"name": f"TEST_ROLE_{timestamp}",
"description": "测试角色",
"permissions": "READ,WRITE"
}
@pytest.fixture
def test_dictionary_data():
"""测试字典数据"""
return {
"type": "USER_STATUS",
"code": "ACTIVE",
"name": "激活",
"value": "1",
"remark": "用户激活状态",
"sort": 1
}
@pytest.fixture
async def cleanup_user(authenticated_client: AsyncClient):
"""清理测试用户"""
user_ids = []
yield user_ids
for user_id in user_ids:
try:
await authenticated_client.delete(f"/api/users/{user_id}")
except Exception:
pass
@pytest.fixture
async def cleanup_role(authenticated_client: AsyncClient):
"""清理测试角色"""
role_ids = []
yield role_ids
for role_id in role_ids:
try:
await authenticated_client.delete(f"/api/roles/{role_id}")
except Exception:
pass
@pytest.fixture
async def cleanup_dictionary(authenticated_client: AsyncClient):
"""清理测试字典"""
dict_ids = []
yield dict_ids
for dict_id in dict_ids:
try:
await authenticated_client.delete(f"/api/dictionaries/{dict_id}")
except Exception:
pass
+23
View File
@@ -0,0 +1,23 @@
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--strict-markers
--tb=short
--cov=.
--cov-report=html
--cov-report=term-missing
--alluredir=allure-results
markers =
auth: 认证相关测试
user: 用户管理测试
role: 角色管理测试
dictionary: 字典管理测试
oauth2: OAuth2相关测试
smoke: 冒烟测试
regression: 回归测试
slow: 慢速测试
asyncio_mode = auto
+29
View File
@@ -0,0 +1,29 @@
# Python依赖包
# 测试框架
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-xdist==3.5.0
# Playwright
playwright==1.40.0
# HTTP客户端
httpx==0.25.2
requests==2.31.0
# 数据处理
pydantic==2.5.2
pydantic-settings==2.1.0
faker==20.1.0
# 配置管理
python-dotenv==1.0.0
pyyaml==6.0.1
# 测试报告
allure-pytest==2.13.2
# 工具库
loguru==0.7.2
+1
View File
@@ -0,0 +1 @@
"""测试模块"""
+83
View File
@@ -0,0 +1,83 @@
"""
认证测试用例
"""
import pytest
from api.auth_api import AuthAPI
from config.settings import settings
@pytest.mark.auth
@pytest.mark.smoke
class TestAuth:
"""认证测试类"""
@pytest.mark.asyncio
async def test_login_success(self, http_client):
"""测试成功登录"""
auth_api = AuthAPI(http_client)
response = await auth_api.login(settings.TEST_USERNAME, settings.TEST_PASSWORD)
assert response.status_code == 200
data = response.json()
assert "accessToken" in data
assert "refreshToken" in data
assert isinstance(data["accessToken"], str)
assert isinstance(data["refreshToken"], str)
@pytest.mark.asyncio
async def test_login_invalid_credentials(self, http_client):
"""测试无效凭证登录"""
auth_api = AuthAPI(http_client)
response = await auth_api.login("invalid_user", "invalid_password")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_login_missing_fields(self, http_client):
"""测试缺少必填字段"""
auth_api = AuthAPI(http_client)
response = await http_client.post("/api/auth/login", json={
"username": "test"
})
assert response.status_code == 400
@pytest.mark.asyncio
async def test_refresh_token_success(self, http_client, auth_token):
"""测试刷新token成功"""
auth_api = AuthAPI(http_client)
login_response = await auth_api.login(settings.TEST_USERNAME, settings.TEST_PASSWORD)
refresh_token = login_response.json().get("refreshToken")
response = await auth_api.refresh_token(refresh_token)
assert response.status_code == 200
data = response.json()
assert "accessToken" in data
assert "refreshToken" in data
@pytest.mark.asyncio
async def test_refresh_token_invalid(self, http_client):
"""测试无效刷新token"""
auth_api = AuthAPI(http_client)
response = await auth_api.refresh_token("invalid_refresh_token")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_logout_success(self, http_client, auth_token):
"""测试登出成功"""
auth_api = AuthAPI(http_client)
response = await auth_api.logout(auth_token)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_logout_without_token(self, http_client):
"""测试无token登出"""
auth_api = AuthAPI(http_client)
response = await http_client.post("/api/auth/logout")
assert response.status_code == 400
+149
View File
@@ -0,0 +1,149 @@
"""
字典管理测试用例
"""
import pytest
from api.dictionary_api import DictionaryAPI
@pytest.mark.dictionary
@pytest.mark.regression
class TestDictionary:
"""字典管理测试类"""
@pytest.mark.asyncio
async def test_create_dictionary_success(self, authenticated_client, test_dictionary_data, cleanup_dictionary):
"""测试创建字典成功"""
dict_api = DictionaryAPI(authenticated_client)
response = await dict_api.create_dictionary(test_dictionary_data)
assert response.status_code == 201
data = response.json()
assert "id" in data
assert data["type"] == test_dictionary_data["type"]
assert data["code"] == test_dictionary_data["code"]
assert data["name"] == test_dictionary_data["name"]
cleanup_dictionary.append(data["id"])
@pytest.mark.asyncio
async def test_create_dictionary_duplicate_type_code(self, authenticated_client, test_dictionary_data, cleanup_dictionary):
"""测试创建重复类型和编码"""
dict_api = DictionaryAPI(authenticated_client)
create_response = await dict_api.create_dictionary(test_dictionary_data)
dict_id = create_response.json()["id"]
response = await dict_api.create_dictionary(test_dictionary_data)
assert response.status_code in [400, 409]
cleanup_dictionary.append(dict_id)
@pytest.mark.asyncio
async def test_get_dictionary_by_id_success(self, authenticated_client, test_dictionary_data, cleanup_dictionary):
"""测试根据ID获取字典成功"""
dict_api = DictionaryAPI(authenticated_client)
create_response = await dict_api.create_dictionary(test_dictionary_data)
dict_id = create_response.json()["id"]
response = await dict_api.get_dictionary_by_id(dict_id)
assert response.status_code == 200
data = response.json()
assert data["id"] == dict_id
assert data["type"] == test_dictionary_data["type"]
cleanup_dictionary.append(dict_id)
@pytest.mark.asyncio
async def test_get_dictionary_by_id_not_found(self, authenticated_client):
"""测试获取不存在的字典"""
dict_api = DictionaryAPI(authenticated_client)
response = await dict_api.get_dictionary_by_id(999999)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_dictionaries_by_type_success(self, authenticated_client, test_dictionary_data, cleanup_dictionary):
"""测试根据类型获取字典成功"""
dict_api = DictionaryAPI(authenticated_client)
create_response = await dict_api.create_dictionary(test_dictionary_data)
dict_id = create_response.json()["id"]
response = await dict_api.get_dictionaries_by_type(test_dictionary_data["type"])
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert any(d["id"] == dict_id for d in data)
cleanup_dictionary.append(dict_id)
@pytest.mark.asyncio
async def test_get_all_dictionaries_success(self, authenticated_client):
"""测试获取所有字典成功"""
dict_api = DictionaryAPI(authenticated_client)
response = await dict_api.get_all_dictionaries()
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_update_dictionary_success(self, authenticated_client, test_dictionary_data, cleanup_dictionary):
"""测试更新字典成功"""
dict_api = DictionaryAPI(authenticated_client)
create_response = await dict_api.create_dictionary(test_dictionary_data)
dict_id = create_response.json()["id"]
update_data = {"name": "Updated name"}
response = await dict_api.update_dictionary(dict_id, update_data)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Updated name"
cleanup_dictionary.append(dict_id)
@pytest.mark.asyncio
async def test_delete_dictionary_success(self, authenticated_client, test_dictionary_data, cleanup_dictionary):
"""测试删除字典成功"""
dict_api = DictionaryAPI(authenticated_client)
create_response = await dict_api.create_dictionary(test_dictionary_data)
dict_id = create_response.json()["id"]
response = await dict_api.delete_dictionary(dict_id)
assert response.status_code == 204
@pytest.mark.asyncio
async def test_check_type_and_code_exists_true(self, authenticated_client, test_dictionary_data, cleanup_dictionary):
"""测试检查类型和编码存在-返回true"""
dict_api = DictionaryAPI(authenticated_client)
create_response = await dict_api.create_dictionary(test_dictionary_data)
dict_id = create_response.json()["id"]
response = await dict_api.check_type_and_code_exists(
test_dictionary_data["type"],
test_dictionary_data["code"]
)
assert response.status_code == 200
assert response.json() is True
cleanup_dictionary.append(dict_id)
@pytest.mark.asyncio
async def test_check_type_and_code_exists_false(self, authenticated_client):
"""测试检查类型和编码存在-返回false"""
dict_api = DictionaryAPI(authenticated_client)
response = await dict_api.check_type_and_code_exists("NONEXISTENT_TYPE", "NONEXISTENT_CODE")
assert response.status_code == 200
assert response.json() is False
+127
View File
@@ -0,0 +1,127 @@
"""
OAuth2客户端管理测试用例
"""
import pytest
from httpx import AsyncClient
@pytest.mark.oauth2
@pytest.mark.regression
class TestOAuth2:
"""OAuth2客户端管理测试类"""
@pytest.fixture
def test_oauth2_client_data(self):
"""测试OAuth2客户端数据"""
import time
timestamp = int(time.time() * 1000)
return {
"clientId": f"test-client-{timestamp}",
"clientSecret": "secret123",
"clientName": "Test Client",
"webServerRedirectUri": "http://localhost:8080/callback",
"scope": "read,write",
"authorizedGrantTypes": "authorization_code,refresh_token",
"accessTokenValiditySeconds": 7200,
"refreshTokenValiditySeconds": 2592000,
"autoApprove": False,
"enabled": True
}
@pytest.fixture
async def cleanup_oauth2_client(self, authenticated_client: AsyncClient):
"""清理测试OAuth2客户端"""
client_ids = []
yield client_ids
for client_id in client_ids:
try:
await authenticated_client.delete(f"/api/oauth2/clients/{client_id}")
except Exception:
pass
@pytest.mark.asyncio
async def test_create_oauth2_client_success(self, authenticated_client, test_oauth2_client_data, cleanup_oauth2_client):
"""测试创建OAuth2客户端成功"""
response = await authenticated_client.post("/api/oauth2/clients", json=test_oauth2_client_data)
assert response.status_code == 201
data = response.json()
assert "id" in data
assert data["clientId"] == test_oauth2_client_data["clientId"]
assert data["clientName"] == test_oauth2_client_data["clientName"]
assert "clientSecret" not in data or data["clientSecret"] != test_oauth2_client_data["clientSecret"]
cleanup_oauth2_client.append(data["id"])
@pytest.mark.asyncio
async def test_get_oauth2_client_by_id_success(self, authenticated_client, test_oauth2_client_data, cleanup_oauth2_client):
"""测试根据ID获取OAuth2客户端成功"""
create_response = await authenticated_client.post("/api/oauth2/clients", json=test_oauth2_client_data)
client_id = create_response.json()["id"]
response = await authenticated_client.get(f"/api/oauth2/clients/{client_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == client_id
assert data["clientId"] == test_oauth2_client_data["clientId"]
cleanup_oauth2_client.append(client_id)
@pytest.mark.asyncio
async def test_get_oauth2_client_by_id_not_found(self, authenticated_client):
"""测试获取不存在的OAuth2客户端"""
response = await authenticated_client.get("/api/oauth2/clients/999999")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_oauth2_client_by_client_id_success(self, authenticated_client, test_oauth2_client_data, cleanup_oauth2_client):
"""测试根据clientId获取OAuth2客户端成功"""
create_response = await authenticated_client.post("/api/oauth2/clients", json=test_oauth2_client_data)
client_id = create_response.json()["id"]
response = await authenticated_client.get(f"/api/oauth2/clients/client-id/{test_oauth2_client_data['clientId']}")
assert response.status_code == 200
data = response.json()
assert data["clientId"] == test_oauth2_client_data["clientId"]
cleanup_oauth2_client.append(client_id)
@pytest.mark.asyncio
async def test_get_all_oauth2_clients_success(self, authenticated_client):
"""测试获取所有OAuth2客户端成功"""
response = await authenticated_client.get("/api/oauth2/clients")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_update_oauth2_client_success(self, authenticated_client, test_oauth2_client_data, cleanup_oauth2_client):
"""测试更新OAuth2客户端成功"""
create_response = await authenticated_client.post("/api/oauth2/clients", json=test_oauth2_client_data)
client_id = create_response.json()["id"]
update_data = {"clientName": "Updated Client Name"}
response = await authenticated_client.put(f"/api/oauth2/clients/{client_id}", json=update_data)
assert response.status_code == 200
data = response.json()
assert data["clientName"] == "Updated Client Name"
cleanup_oauth2_client.append(client_id)
@pytest.mark.asyncio
async def test_delete_oauth2_client_success(self, authenticated_client, test_oauth2_client_data, cleanup_oauth2_client):
"""测试删除OAuth2客户端成功"""
create_response = await authenticated_client.post("/api/oauth2/clients", json=test_oauth2_client_data)
client_id = create_response.json()["id"]
response = await authenticated_client.delete(f"/api/oauth2/clients/{client_id}")
assert response.status_code == 204
+185
View File
@@ -0,0 +1,185 @@
"""
角色管理测试用例
"""
import pytest
from api.role_api import RoleAPI
@pytest.mark.role
@pytest.mark.regression
class TestRole:
"""角色管理测试类"""
@pytest.mark.asyncio
async def test_create_role_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试创建角色成功"""
role_api = RoleAPI(authenticated_client)
response = await role_api.create_role(test_role_data)
assert response.status_code == 201
data = response.json()
assert "id" in data
assert data["name"] == test_role_data["name"]
assert data["description"] == test_role_data["description"]
assert data["permissions"] == test_role_data["permissions"]
cleanup_role.append(data["id"])
@pytest.mark.asyncio
async def test_create_role_duplicate_name(self, authenticated_client, test_role_data, cleanup_role):
"""测试创建重复角色名"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.create_role(test_role_data)
assert response.status_code in [400, 409]
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_get_role_by_id_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试根据ID获取角色成功"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.get_role_by_id(role_id)
assert response.status_code == 200
data = response.json()
assert data["id"] == role_id
assert data["name"] == test_role_data["name"]
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_get_role_by_id_not_found(self, authenticated_client):
"""测试获取不存在的角色"""
role_api = RoleAPI(authenticated_client)
response = await role_api.get_role_by_id(999999)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_role_by_name_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试根据名称获取角色成功"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.get_role_by_name(test_role_data["name"])
assert response.status_code == 200
data = response.json()
assert data["name"] == test_role_data["name"]
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_get_all_roles_success(self, authenticated_client):
"""测试获取所有角色成功"""
role_api = RoleAPI(authenticated_client)
response = await role_api.get_all_roles()
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_update_role_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试更新角色成功"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
update_data = {"description": "Updated description"}
response = await role_api.update_role(role_id, update_data)
assert response.status_code == 200
data = response.json()
assert data["description"] == "Updated description"
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_delete_role_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试删除角色成功"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.delete_role(role_id)
assert response.status_code == 204
@pytest.mark.asyncio
async def test_logical_delete_role_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试逻辑删除角色成功"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.logical_delete_role(role_id)
assert response.status_code == 200
get_response = await role_api.get_role_by_id(role_id)
assert get_response.status_code == 404
get_deleted_response = await role_api.get_all_roles(include_deleted=True)
deleted_roles = get_deleted_response.json()
assert any(r["id"] == role_id for r in deleted_roles)
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_restore_role_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试恢复角色成功"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
await role_api.logical_delete_role(role_id)
response = await role_api.restore_role(role_id)
assert response.status_code == 200
get_response = await role_api.get_role_by_id(role_id)
assert get_response.status_code == 200
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_check_name_exists_true(self, authenticated_client, test_role_data, cleanup_role):
"""测试检查角色名存在-返回true"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.check_name_exists(test_role_data["name"])
assert response.status_code == 200
assert response.json() is True
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_check_name_exists_false(self, authenticated_client):
"""测试检查角色名存在-返回false"""
role_api = RoleAPI(authenticated_client)
response = await role_api.check_name_exists("NONEXISTENT_ROLE")
assert response.status_code == 200
assert response.json() is False
+193
View File
@@ -0,0 +1,193 @@
"""
用户管理测试用例
"""
import pytest
from api.user_api import UserAPI
from config.settings import settings
@pytest.mark.user
@pytest.mark.regression
class TestUser:
"""用户管理测试类"""
@pytest.mark.asyncio
async def test_create_user_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试创建用户成功"""
user_api = UserAPI(authenticated_client)
response = await user_api.create_user(test_user_data)
print(f"Response status: {response.status_code}")
print(f"Response text: {response.text}")
assert response.status_code == 201
data = response.json()
assert "id" in data
assert data["username"] == test_user_data["username"]
assert data["email"] == test_user_data["email"]
assert "password" not in data or data["password"] != test_user_data["password"]
cleanup_user.append(data["id"])
@pytest.mark.asyncio
async def test_create_user_duplicate_username(self, authenticated_client, test_user_data, cleanup_user):
"""测试创建重复用户名"""
user_api = UserAPI(authenticated_client)
await user_api.create_user(test_user_data)
response = await user_api.create_user(test_user_data)
assert response.status_code in [400, 409]
@pytest.mark.asyncio
async def test_get_user_by_id_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试根据ID获取用户成功"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
user_id = create_response.json()["id"]
response = await user_api.get_user_by_id(user_id)
assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
assert data["username"] == test_user_data["username"]
cleanup_user.append(user_id)
@pytest.mark.asyncio
async def test_get_user_by_id_not_found(self, authenticated_client):
"""测试获取不存在的用户"""
user_api = UserAPI(authenticated_client)
response = await user_api.get_user_by_id(999999)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_all_users_success(self, authenticated_client):
"""测试获取所有用户成功"""
user_api = UserAPI(authenticated_client)
response = await user_api.get_all_users()
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_update_user_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试更新用户成功"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
user_id = create_response.json()["id"]
update_data = {"email": "updated@example.com"}
response = await user_api.update_user(user_id, update_data)
assert response.status_code == 200
data = response.json()
assert data["email"] == "updated@example.com"
cleanup_user.append(user_id)
@pytest.mark.asyncio
async def test_delete_user_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试删除用户成功"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
user_id = create_response.json()["id"]
response = await user_api.delete_user(user_id)
assert response.status_code == 204
@pytest.mark.asyncio
async def test_logical_delete_user_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试逻辑删除用户成功"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
user_id = create_response.json()["id"]
response = await user_api.logical_delete_user(user_id)
assert response.status_code == 200
get_response = await user_api.get_user_by_id(user_id)
assert get_response.status_code == 404
get_deleted_response = await user_api.get_all_users(include_deleted=True)
deleted_users = get_deleted_response.json()
assert any(u["id"] == user_id for u in deleted_users)
cleanup_user.append(user_id)
@pytest.mark.asyncio
async def test_restore_user_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试恢复用户成功"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
user_id = create_response.json()["id"]
await user_api.logical_delete_user(user_id)
response = await user_api.restore_user(user_id)
assert response.status_code == 200
get_response = await user_api.get_user_by_id(user_id)
assert get_response.status_code == 200
cleanup_user.append(user_id)
@pytest.mark.asyncio
async def test_check_username_exists_true(self, authenticated_client, test_user_data, cleanup_user):
"""测试检查用户名存在-返回true"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
user_id = create_response.json()["id"]
response = await user_api.check_username_exists(test_user_data["username"])
assert response.status_code == 200
assert response.json() is True
cleanup_user.append(user_id)
@pytest.mark.asyncio
async def test_check_username_exists_false(self, authenticated_client):
"""测试检查用户名存在-返回false"""
user_api = UserAPI(authenticated_client)
response = await user_api.check_username_exists("nonexistent_user")
assert response.status_code == 200
assert response.json() is False
@pytest.mark.asyncio
async def test_check_email_exists_true(self, authenticated_client, test_user_data, cleanup_user):
"""测试检查邮箱存在-返回true"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
user_id = create_response.json()["id"]
response = await user_api.check_email_exists(test_user_data["email"])
assert response.status_code == 200
assert response.json() is True
cleanup_user.append(user_id)
@pytest.mark.asyncio
async def test_check_email_exists_false(self, authenticated_client):
"""测试检查邮箱存在-返回false"""
user_api = UserAPI(authenticated_client)
response = await user_api.check_email_exists("nonexistent@example.com")
assert response.status_code == 200
assert response.json() is False
+3
View File
@@ -0,0 +1,3 @@
"""
工具模块
"""
+83
View File
@@ -0,0 +1,83 @@
"""
断言工具
"""
from typing import Any, Dict, List
from httpx import Response
class Assertions:
"""断言工具类"""
@staticmethod
def assert_status_code(response: Response, expected_status: int):
"""断言状态码"""
assert response.status_code == expected_status, \
f"Expected status code {expected_status}, got {response.status_code}. Response: {response.text}"
@staticmethod
def assert_response_contains(response: Response, key: str, value: Any = None):
"""断言响应包含指定字段"""
data = response.json()
assert key in data, f"Response does not contain key '{key}'. Response: {data}"
if value is not None:
assert data[key] == value, \
f"Expected {value} for key '{key}', got {data[key]}"
@staticmethod
def assert_response_is_list(response: Response):
"""断言响应是列表"""
data = response.json()
assert isinstance(data, list), f"Expected list, got {type(data)}. Response: {data}"
@staticmethod
def assert_response_not_empty(response: Response):
"""断言响应不为空"""
data = response.json()
assert data, f"Response is empty. Response: {data}"
@staticmethod
def assert_response_field_type(response: Response, field: str, expected_type: type):
"""断言响应字段类型"""
data = response.json()
assert field in data, f"Response does not contain field '{field}'"
assert isinstance(data[field], expected_type), \
f"Expected field '{field}' to be {expected_type}, got {type(data[field])}"
@staticmethod
def assert_response_fields_present(response: Response, fields: List[str]):
"""断言响应包含所有指定字段"""
data = response.json()
missing_fields = [field for field in fields if field not in data]
assert not missing_fields, \
f"Response is missing fields: {missing_fields}. Response: {data}"
@staticmethod
def assert_response_field_length(response: Response, field: str, min_length: int = None, max_length: int = None):
"""断言响应字段长度"""
data = response.json()
assert field in data, f"Response does not contain field '{field}'"
field_value = data[field]
if isinstance(field_value, (str, list, dict)):
length = len(field_value)
if min_length is not None:
assert length >= min_length, \
f"Field '{field}' length {length} is less than minimum {min_length}"
if max_length is not None:
assert length <= max_length, \
f"Field '{field}' length {length} is greater than maximum {max_length}"
else:
raise AssertionError(f"Field '{field}' is not a string, list, or dict")
@staticmethod
def assert_error_response(response: Response, expected_message: str = None):
"""断言错误响应"""
Assertions.assert_status_code(response, 400)
if expected_message:
data = response.json()
assert expected_message in str(data), \
f"Expected error message '{expected_message}' not found in response: {data}"
assertions = Assertions()
+72
View File
@@ -0,0 +1,72 @@
"""
测试数据生成器
"""
import random
import string
from faker import Faker
class DataGenerator:
"""测试数据生成器"""
def __init__(self, locale: str = "zh_CN"):
self.faker = Faker(locale)
def generate_username(self) -> str:
"""生成用户名"""
return f"testuser_{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
def generate_password(self, length: int = 12) -> str:
"""生成密码"""
chars = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(random.choices(chars, k=length))
def generate_email(self) -> str:
"""生成邮箱"""
return self.faker.email()
def generate_phone(self) -> str:
"""生成手机号"""
return self.faker.phone_number()
def generate_name(self) -> str:
"""生成姓名"""
return self.faker.name()
def generate_role_name(self) -> str:
"""生成角色名"""
return f"ROLE_{''.join(random.choices(string.ascii_uppercase, k=6))}"
def generate_dict_type(self) -> str:
"""生成字典类型"""
return f"DICT_TYPE_{''.join(random.choices(string.ascii_uppercase, k=4))}"
def generate_dict_code(self) -> str:
"""生成字典编码"""
return f"CODE_{''.join(random.choices(string.ascii_uppercase + string.digits, k=6))}"
def generate_url(self) -> str:
"""生成URL"""
return self.faker.url()
def generate_company_name(self) -> str:
"""生成公司名"""
return self.faker.company()
def generate_address(self) -> str:
"""生成地址"""
return self.faker.address()
def generate_description(self) -> str:
"""生成描述"""
return self.faker.text(max_nb_chars=200)
def generate_permissions(self) -> str:
"""生成权限字符串"""
permissions = ["READ", "WRITE", "DELETE", "ADMIN", "MANAGE"]
selected = random.sample(permissions, random.randint(1, len(permissions)))
return ",".join(selected)
data_generator = DataGenerator()
+33
View File
@@ -0,0 +1,33 @@
"""
日志工具
"""
import sys
from loguru import logger
from pathlib import Path
def setup_logger(log_file: str = "e2e_tests.log", log_level: str = "INFO"):
"""配置日志"""
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level=log_level,
colorize=True
)
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level=log_level,
rotation="10 MB",
retention="7 days",
compression="zip"
)
return logger
setup_logger()