Files
novalon-manage-system/api_integration_tests/tests/test_real_e2e.py
T
张翔 c50ccd258f feat: 重构测试框架并优化代码结构
refactor(tests): 将e2e_tests迁移到tests_suite和api_integration_tests
style: 为Java类添加文档注释
docs: 更新.gitignore和配置文件
test: 添加性能测试和Playwright测试脚本
chore: 清理旧测试文件和配置
2026-03-14 13:49:39 +08:00

292 lines
11 KiB
Python

"""
真实的端到端(E2E)测试 - 使用Playwright测试前后端联通
"""
import pytest
import time
from playwright.async_api import async_playwright, Page, Browser, BrowserContext
from httpx import AsyncClient
from config.settings import settings
@pytest.mark.e2e
@pytest.mark.playwright
class TestRealE2E:
"""真实的端到端测试类"""
@pytest.fixture
async def browser(self):
"""浏览器fixture - headless模式"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
yield browser
await browser.close()
@pytest.fixture
async def context(self, browser):
"""浏览器上下文fixture"""
context = await browser.new_context()
yield context
await context.close()
@pytest.fixture
async def page(self, context):
"""页面fixture"""
page = await context.new_page()
page.set_default_timeout(30000)
yield page
await page.close()
@pytest.fixture
async def authenticated_client(self):
"""已认证的HTTP客户端"""
async with AsyncClient(base_url=settings.API_BASE_URL) as client:
response = await client.post(
"/api/auth/login",
json={
"username": settings.TEST_USERNAME,
"password": settings.TEST_PASSWORD
}
)
assert response.status_code == 200
token = response.json().get("token")
client.headers.update({"Authorization": f"Bearer {token}"})
yield client
@pytest.mark.asyncio
async def test_complete_user_lifecycle_e2e(self, page, authenticated_client):
"""测试完整的用户生命周期 - 前后端联通"""
timestamp = int(time.time() * 1000)
username = f"e2e_user_{timestamp}"
email = f"e2e_{timestamp}@example.com"
# 1. 通过前端登录
await page.goto("http://localhost:3002/login")
await page.wait_for_load_state("networkidle")
await page.fill('input[name="username"]', settings.TEST_USERNAME)
await page.fill('input[name="password"]', settings.TEST_PASSWORD)
await page.click('button[type="submit"]')
await page.wait_for_url("**/")
assert await page.title() != ""
# 2. 通过前端创建用户
await page.click('text=用户管理')
await page.wait_for_url("**/users")
await page.click('text=创建用户')
await page.fill('input[name="username"]', username)
await page.fill('input[name="email"]', email)
await page.fill('input[name="phone"]', '13800138000')
await page.fill('input[name="password"]', 'Test123!@#')
await page.fill('input[name="confirmPassword"]', 'Test123!@#')
await page.click('button[type="submit"]')
await page.wait_for_selector('.success-message', timeout=10000)
success_message = await page.text_content('.success-message')
assert '成功' in success_message or 'success' in success_message.lower()
# 3. 通过API验证用户已创建
response = await authenticated_client.get("/api/users")
assert response.status_code == 200
users = response.json()
user_exists = any(user['username'] == username for user in users)
assert user_exists, f"User {username} not found in API response"
# 4. 通过前端验证用户显示
await page.reload()
await page.wait_for_load_state("networkidle")
page_content = await page.content()
assert username in page_content, f"Username {username} not found in page content"
@pytest.mark.asyncio
async def test_role_assignment_e2e(self, page, authenticated_client):
"""测试角色分配 - 前后端联通"""
timestamp = int(time.time() * 1000)
role_name = f"E2E_Role_{timestamp}"
role_key = f"e2e_role_{timestamp}"
# 1. 通过API创建角色
role_response = await authenticated_client.post(
"/api/roles",
json={
"roleName": role_name,
"roleKey": role_key,
"roleSort": 1,
"status": 1
}
)
assert role_response.status_code == 201
role_id = role_response.json()["id"]
# 2. 通过前端登录
await page.goto("http://localhost:3002/login")
await page.fill('input[name="username"]', settings.TEST_USERNAME)
await page.fill('input[name="password"]', settings.TEST_PASSWORD)
await page.click('button[type="submit"]')
await page.wait_for_url("**/")
# 3. 通过前端创建用户
await page.click('text=用户管理')
await page.wait_for_url("**/users")
await page.click('text=创建用户')
username = f"e2e_user_{timestamp}"
await page.fill('input[name="username"]', username)
await page.fill('input[name="email"]', f"e2e_{timestamp}@example.com")
await page.fill('input[name="password"]', 'Test123!@#')
await page.fill('input[name="confirmPassword"]', 'Test123!@#')
await page.click('button[type="submit"]')
await page.wait_for_selector('.success-message', timeout=10000)
# 4. 通过API获取用户ID并分配角色
users_response = await authenticated_client.get("/api/users")
users = users_response.json()
user = next((u for u in users if u['username'] == username), None)
assert user is not None
await authenticated_client.put(
f"/api/users/{user['id']}",
json={"roleId": role_id}
)
# 5. 通过API验证角色分配
user_response = await authenticated_client.get(f"/api/users/{user['id']}")
assert user_response.status_code == 200
user_data = user_response.json()
assert user_data["roleId"] == role_id
# 6. 清理测试数据
await authenticated_client.delete(f"/api/users/{user['id']}")
await authenticated_client.delete(f"/api/roles/{role_id}")
@pytest.mark.asyncio
async def test_login_and_navigation_e2e(self, page):
"""测试登录和导航 - 前后端联通"""
# 1. 访问登录页面
await page.goto("http://localhost:3002/login")
await page.wait_for_load_state("networkidle")
title = await page.title()
assert "登录" in title or "Login" in title.lower()
# 2. 填写登录表单
await page.fill('input[name="username"]', settings.TEST_USERNAME)
await page.fill('input[name="password"]', settings.TEST_PASSWORD)
# 3. 点击登录按钮
await page.click('button[type="submit"]')
# 4. 等待跳转到首页
await page.wait_for_url("**/", timeout=10000)
# 5. 验证用户信息显示
user_info = await page.query_selector('.user-info')
assert user_info is not None, "User info element not found"
user_text = await user_info.text_content()
assert settings.TEST_USERNAME in user_text
# 6. 测试导航到不同页面
await page.click('text=用户管理')
await page.wait_for_url("**/users")
await page.click('text=角色管理')
await page.wait_for_url("**/roles")
await page.click('text=系统配置')
await page.wait_for_url("**/config")
@pytest.mark.asyncio
async def test_system_config_e2e(self, page, authenticated_client):
"""测试系统配置 - 前后端联通"""
# 1. 通过前端登录
await page.goto("http://localhost:3002/login")
await page.fill('input[name="username"]', settings.TEST_USERNAME)
await page.fill('input[name="password"]', settings.TEST_PASSWORD)
await page.click('button[type="submit"]')
await page.wait_for_url("**/")
# 2. 通过前端访问系统配置
await page.click('text=系统配置')
await page.wait_for_url("**/config")
# 3. 验证配置列表显示
table = await page.query_selector('table')
assert table is not None, "Config table not found"
# 4. 通过API获取配置
config_response = await authenticated_client.get("/api/config")
assert config_response.status_code == 200
configs = config_response.json()
# 5. 验证前后端数据一致
page_content = await page.content()
for config in configs[:3]:
assert config['configKey'] in page_content or config['configName'] in page_content
@pytest.mark.asyncio
async def test_search_and_filter_e2e(self, page, authenticated_client):
"""测试搜索和过滤 - 前后端联通"""
timestamp = int(time.time() * 1000)
# 1. 通过API创建多个测试用户
user_ids = []
for i in range(3):
username = f"search_{timestamp}_{i}"
response = await authenticated_client.post(
"/api/users",
json={
"username": username,
"password": "Test123!@#",
"email": f"search_{timestamp}_{i}@example.com",
"status": 1
}
)
assert response.status_code == 201
user_ids.append(response.json()["id"])
try:
# 2. 通过前端登录
await page.goto("http://localhost:3002/login")
await page.fill('input[name="username"]', settings.TEST_USERNAME)
await page.fill('input[name="password"]', settings.TEST_PASSWORD)
await page.click('button[type="submit"]')
await page.wait_for_url("**/")
# 3. 通过前端搜索用户
await page.click('text=用户管理')
await page.wait_for_url("**/users")
await page.fill('input[name="keyword"]', f"search_{timestamp}")
await page.click('button[type="search"]')
await page.wait_for_load_state("networkidle")
# 4. 验证搜索结果显示
page_content = await page.content()
assert f"search_{timestamp}" in page_content
# 5. 通过API验证搜索结果
search_response = await authenticated_client.get(
"/api/users/page",
params={"keyword": f"search_{timestamp}", "page": 0, "size": 10}
)
assert search_response.status_code == 200
search_data = search_response.json()
assert len(search_data["content"]) >= 3
finally:
# 6. 清理测试数据
for user_id in user_ids:
try:
await authenticated_client.delete(f"/api/users/{user_id}")
except Exception:
pass