""" 真实的端到端(E2E)测试 - 使用Playwright测试前后端联通 """ import pytest import time from playwright.async_api import async_playwright, Page, Browser, BrowserContext from httpx import AsyncClient from config.settings import settings @pytest.mark.e2e @pytest.mark.playwright class TestRealE2E: """真实的端到端测试类""" @pytest.fixture async def browser(self): """浏览器fixture - headless模式""" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) yield browser await browser.close() @pytest.fixture async def context(self, browser): """浏览器上下文fixture""" context = await browser.new_context() yield context await context.close() @pytest.fixture async def page(self, context): """页面fixture""" page = await context.new_page() page.set_default_timeout(30000) yield page await page.close() @pytest.fixture async def authenticated_client(self): """已认证的HTTP客户端""" async with AsyncClient(base_url=settings.API_BASE_URL) as client: response = await client.post( "/api/auth/login", json={ "username": settings.TEST_USERNAME, "password": settings.TEST_PASSWORD } ) assert response.status_code == 200 token = response.json().get("token") client.headers.update({"Authorization": f"Bearer {token}"}) yield client @pytest.mark.asyncio async def test_complete_user_lifecycle_e2e(self, page, authenticated_client): """测试完整的用户生命周期 - 前后端联通""" timestamp = int(time.time() * 1000) username = f"e2e_user_{timestamp}" email = f"e2e_{timestamp}@example.com" # 1. 通过前端登录 await page.goto("http://localhost:3002/login") await page.wait_for_load_state("networkidle") await page.fill('input[name="username"]', settings.TEST_USERNAME) await page.fill('input[name="password"]', settings.TEST_PASSWORD) await page.click('button[type="submit"]') await page.wait_for_url("**/") assert await page.title() != "" # 2. 通过前端创建用户 await page.click('text=用户管理') await page.wait_for_url("**/users") await page.click('text=创建用户') await page.fill('input[name="username"]', username) await page.fill('input[name="email"]', email) await page.fill('input[name="phone"]', '13800138000') await page.fill('input[name="password"]', 'Test123!@#') await page.fill('input[name="confirmPassword"]', 'Test123!@#') await page.click('button[type="submit"]') await page.wait_for_selector('.success-message', timeout=10000) success_message = await page.text_content('.success-message') assert '成功' in success_message or 'success' in success_message.lower() # 3. 通过API验证用户已创建 response = await authenticated_client.get("/api/users") assert response.status_code == 200 users = response.json() user_exists = any(user['username'] == username for user in users) assert user_exists, f"User {username} not found in API response" # 4. 通过前端验证用户显示 await page.reload() await page.wait_for_load_state("networkidle") page_content = await page.content() assert username in page_content, f"Username {username} not found in page content" @pytest.mark.asyncio async def test_role_assignment_e2e(self, page, authenticated_client): """测试角色分配 - 前后端联通""" timestamp = int(time.time() * 1000) role_name = f"E2E_Role_{timestamp}" role_key = f"e2e_role_{timestamp}" # 1. 通过API创建角色 role_response = await authenticated_client.post( "/api/roles", json={ "roleName": role_name, "roleKey": role_key, "roleSort": 1, "status": 1 } ) assert role_response.status_code == 201 role_id = role_response.json()["id"] # 2. 通过前端登录 await page.goto("http://localhost:3002/login") await page.fill('input[name="username"]', settings.TEST_USERNAME) await page.fill('input[name="password"]', settings.TEST_PASSWORD) await page.click('button[type="submit"]') await page.wait_for_url("**/") # 3. 通过前端创建用户 await page.click('text=用户管理') await page.wait_for_url("**/users") await page.click('text=创建用户') username = f"e2e_user_{timestamp}" await page.fill('input[name="username"]', username) await page.fill('input[name="email"]', f"e2e_{timestamp}@example.com") await page.fill('input[name="password"]', 'Test123!@#') await page.fill('input[name="confirmPassword"]', 'Test123!@#') await page.click('button[type="submit"]') await page.wait_for_selector('.success-message', timeout=10000) # 4. 通过API获取用户ID并分配角色 users_response = await authenticated_client.get("/api/users") users = users_response.json() user = next((u for u in users if u['username'] == username), None) assert user is not None await authenticated_client.put( f"/api/users/{user['id']}", json={"roleId": role_id} ) # 5. 通过API验证角色分配 user_response = await authenticated_client.get(f"/api/users/{user['id']}") assert user_response.status_code == 200 user_data = user_response.json() assert user_data["roleId"] == role_id # 6. 清理测试数据 await authenticated_client.delete(f"/api/users/{user['id']}") await authenticated_client.delete(f"/api/roles/{role_id}") @pytest.mark.asyncio async def test_login_and_navigation_e2e(self, page): """测试登录和导航 - 前后端联通""" # 1. 访问登录页面 await page.goto("http://localhost:3002/login") await page.wait_for_load_state("networkidle") title = await page.title() assert "登录" in title or "Login" in title.lower() # 2. 填写登录表单 await page.fill('input[name="username"]', settings.TEST_USERNAME) await page.fill('input[name="password"]', settings.TEST_PASSWORD) # 3. 点击登录按钮 await page.click('button[type="submit"]') # 4. 等待跳转到首页 await page.wait_for_url("**/", timeout=10000) # 5. 验证用户信息显示 user_info = await page.query_selector('.user-info') assert user_info is not None, "User info element not found" user_text = await user_info.text_content() assert settings.TEST_USERNAME in user_text # 6. 测试导航到不同页面 await page.click('text=用户管理') await page.wait_for_url("**/users") await page.click('text=角色管理') await page.wait_for_url("**/roles") await page.click('text=系统配置') await page.wait_for_url("**/config") @pytest.mark.asyncio async def test_system_config_e2e(self, page, authenticated_client): """测试系统配置 - 前后端联通""" # 1. 通过前端登录 await page.goto("http://localhost:3002/login") await page.fill('input[name="username"]', settings.TEST_USERNAME) await page.fill('input[name="password"]', settings.TEST_PASSWORD) await page.click('button[type="submit"]') await page.wait_for_url("**/") # 2. 通过前端访问系统配置 await page.click('text=系统配置') await page.wait_for_url("**/config") # 3. 验证配置列表显示 table = await page.query_selector('table') assert table is not None, "Config table not found" # 4. 通过API获取配置 config_response = await authenticated_client.get("/api/config") assert config_response.status_code == 200 configs = config_response.json() # 5. 验证前后端数据一致 page_content = await page.content() for config in configs[:3]: assert config['configKey'] in page_content or config['configName'] in page_content @pytest.mark.asyncio async def test_search_and_filter_e2e(self, page, authenticated_client): """测试搜索和过滤 - 前后端联通""" timestamp = int(time.time() * 1000) # 1. 通过API创建多个测试用户 user_ids = [] for i in range(3): username = f"search_{timestamp}_{i}" response = await authenticated_client.post( "/api/users", json={ "username": username, "password": "Test123!@#", "email": f"search_{timestamp}_{i}@example.com", "status": 1 } ) assert response.status_code == 201 user_ids.append(response.json()["id"]) try: # 2. 通过前端登录 await page.goto("http://localhost:3002/login") await page.fill('input[name="username"]', settings.TEST_USERNAME) await page.fill('input[name="password"]', settings.TEST_PASSWORD) await page.click('button[type="submit"]') await page.wait_for_url("**/") # 3. 通过前端搜索用户 await page.click('text=用户管理') await page.wait_for_url("**/users") await page.fill('input[name="keyword"]', f"search_{timestamp}") await page.click('button[type="search"]') await page.wait_for_load_state("networkidle") # 4. 验证搜索结果显示 page_content = await page.content() assert f"search_{timestamp}" in page_content # 5. 通过API验证搜索结果 search_response = await authenticated_client.get( "/api/users/page", params={"keyword": f"search_{timestamp}", "page": 0, "size": 10} ) assert search_response.status_code == 200 search_data = search_response.json() assert len(search_data["content"]) >= 3 finally: # 6. 清理测试数据 for user_id in user_ids: try: await authenticated_client.delete(f"/api/users/{user_id}") except Exception: pass