""" 端到端业务流程测试用例 """ import pytest import time from api.auth_api import AuthAPI from api.user_api import UserAPI from api.role_api import RoleAPI from api.notice_api import SysNoticeAPI @pytest.mark.e2e @pytest.mark.regression class TestBusinessFlow: """端到端业务流程测试类""" @pytest.mark.asyncio async def test_complete_user_lifecycle(self, authenticated_client): """测试完整用户生命周期""" auth_api = AuthAPI(authenticated_client) user_api = UserAPI(authenticated_client) timestamp = int(time.time() * 1000) new_user_data = { "username": f"e2e_user_{timestamp}", "password": "Test123!@#", "email": f"e2e_{timestamp}@example.com", "phone": "13800138000", "status": 1 } create_response = await user_api.create_user(new_user_data) assert create_response.status_code == 201 user_id = create_response.json()["id"] get_response = await user_api.get_user_by_id(user_id) assert get_response.status_code == 200 user_data = get_response.json() assert user_data["username"] == new_user_data["username"] update_data = {"email": f"updated_{timestamp}@example.com"} update_response = await user_api.update_user(user_id, update_data) assert update_response.status_code == 200 delete_response = await user_api.delete_user(user_id) assert delete_response.status_code in [200, 204] final_get_response = await user_api.get_user_by_id(user_id) assert final_get_response.status_code == 404 @pytest.mark.asyncio async def test_role_assignment_workflow(self, authenticated_client): """测试角色分配工作流""" user_api = UserAPI(authenticated_client) role_api = RoleAPI(authenticated_client) timestamp = int(time.time() * 1000) role_data = { "roleName": f"E2E_Role_{timestamp}", "roleKey": f"e2e_role_{timestamp}", "roleSort": 1, "status": 1 } role_response = await role_api.create_role(role_data) assert role_response.status_code == 201 role_id = role_response.json()["id"] user_data = { "username": f"e2e_user_{timestamp}", "password": "Test123!@#", "email": f"e2e_{timestamp}@example.com", "status": 1 } user_response = await user_api.create_user(user_data) assert user_response.status_code == 201 user_id = user_response.json()["id"] assign_response = await user_api.update_user(user_id, {"roleId": role_id}) assert assign_response.status_code == 200 verify_response = await user_api.get_user_by_id(user_id) assert verify_response.json()["roleId"] == role_id await user_api.delete_user(user_id) await role_api.delete_role(role_id) @pytest.mark.asyncio async def test_notification_workflow(self, authenticated_client): """测试通知工作流""" notice_api = SysNoticeAPI(authenticated_client) user_api = UserAPI(authenticated_client) timestamp = int(time.time() * 1000) notice_data = { "noticeTitle": f"E2E_Notice_{timestamp}", "noticeType": "1", "noticeContent": "This is an E2E test notice", "status": "0" } create_response = await notice_api.create(notice_data) assert create_response.status_code in [200, 201] notice_data_response = create_response.json() notice_id = notice_data_response.get("id") if not notice_id: notice_title = notice_data_response.get("noticeTitle") all_notices = await notice_api.get_all() notices = all_notices.json() notice = next((n for n in notices if n["noticeTitle"] == notice_title), None) notice_id = notice["id"] if notice else None assert notice_id is not None get_response = await notice_api.get_by_id(notice_id) assert get_response.status_code == 200 all_notices = await notice_api.get_all() assert all_notices.status_code == 200 notices = all_notices.json() assert any(notice["id"] == notice_id for notice in notices) update_data = {"noticeTitle": f"Updated_Notice_{timestamp}"} update_response = await notice_api.update(notice_id, update_data) assert update_response.status_code == 200 await notice_api.delete(notice_id) final_get = await notice_api.get_by_id(notice_id) assert final_get.status_code in [200, 404] @pytest.mark.asyncio async def test_multi_role_user_management(self, authenticated_client): """测试多角色用户管理""" user_api = UserAPI(authenticated_client) role_api = RoleAPI(authenticated_client) timestamp = int(time.time() * 1000) admin_role_data = { "roleName": f"Admin_{timestamp}", "roleKey": f"admin_{timestamp}", "roleSort": 1, "status": 1 } admin_role = await role_api.create_role(admin_role_data) admin_role_id = admin_role.json()["id"] user_role_data = { "roleName": f"User_{timestamp}", "roleKey": f"user_{timestamp}", "roleSort": 2, "status": 1 } user_role = await role_api.create_role(user_role_data) user_role_id = user_role.json()["id"] admin_user_data = { "username": f"admin_{timestamp}", "password": "Admin123!@#", "email": f"admin_{timestamp}@example.com", "status": 1 } admin_user = await user_api.create_user(admin_user_data) admin_user_id = admin_user.json()["id"] regular_user_data = { "username": f"regular_{timestamp}", "password": "User123!@#", "email": f"regular_{timestamp}@example.com", "status": 1 } regular_user = await user_api.create_user(regular_user_data) regular_user_id = regular_user.json()["id"] await user_api.update_user(admin_user_id, {"roleId": admin_role_id}) await user_api.update_user(regular_user_id, {"roleId": user_role_id}) admin_verify = await user_api.get_user_by_id(admin_user_id) assert admin_verify.json()["roleId"] == admin_role_id regular_verify = await user_api.get_user_by_id(regular_user_id) assert regular_verify.json()["roleId"] == user_role_id all_users = await user_api.get_all_users() users = all_users.json() assert len(users) >= 2 await user_api.delete_user(admin_user_id) await user_api.delete_user(regular_user_id) await role_api.delete_role(admin_role_id) await role_api.delete_role(user_role_id) @pytest.mark.asyncio async def test_user_role_cascade_operations(self, authenticated_client): """测试用户角色级联操作""" user_api = UserAPI(authenticated_client) role_api = RoleAPI(authenticated_client) timestamp = int(time.time() * 1000) role_data = { "roleName": f"Cascade_Role_{timestamp}", "roleKey": f"cascade_role_{timestamp}", "roleSort": 1, "status": 1 } role_response = await role_api.create_role(role_data) role_id = role_response.json()["id"] user_ids = [] for i in range(3): user_data = { "username": f"cascade_user_{timestamp}_{i}", "password": "Test123!@#", "email": f"cascade_{timestamp}_{i}@example.com", "status": 1 } user_response = await user_api.create_user(user_data) user_id = user_response.json()["id"] user_ids.append(user_id) await user_api.update_user(user_id, {"roleId": role_id}) await role_api.update_role(role_id, {"status": 0}) for user_id in user_ids: user_data = await user_api.get_user_by_id(user_id) assert user_data.json()["roleId"] == role_id for user_id in user_ids: await user_api.delete_user(user_id) await role_api.delete_role(role_id) @pytest.mark.asyncio async def test_search_and_filter_workflow(self, authenticated_client): """测试搜索和过滤工作流""" user_api = UserAPI(authenticated_client) role_api = RoleAPI(authenticated_client) timestamp = int(time.time() * 1000) role_data = { "roleName": f"Search_Role_{timestamp}", "roleKey": f"search_role_{timestamp}", "roleSort": 1, "status": 1 } role_response = await role_api.create_role(role_data) role_id = role_response.json()["id"] user_ids = [] for i in range(5): user_data = { "username": f"search_{timestamp}_{i}", "password": "Test123!@#", "email": f"search_{timestamp}_{i}@example.com", "status": 1 } user_response = await user_api.create_user(user_data) user_id = user_response.json()["id"] user_ids.append(user_id) search_response = await user_api.get_users_by_page(keyword=f"search_{timestamp}") assert search_response.status_code == 200 search_data = search_response.json() assert len(search_data["content"]) >= 5 all_users = await user_api.get_all_users() assert all_users.status_code == 200 for user_id in user_ids: await user_api.delete_user(user_id) await role_api.delete_role(role_id) @pytest.mark.asyncio async def test_error_recovery_workflow(self, authenticated_client): """测试错误恢复工作流""" user_api = UserAPI(authenticated_client) timestamp = int(time.time() * 1000) invalid_user_data = { "username": "", "password": "123", "email": "invalid-email" } invalid_response = await user_api.create_user(invalid_user_data) assert invalid_response.status_code in [400, 409, 422] valid_user_data = { "username": f"recovery_{timestamp}", "password": "Valid123!@#", "email": f"recovery_{timestamp}@example.com", "status": 1 } valid_response = await user_api.create_user(valid_user_data) assert valid_response.status_code == 201 user_id = valid_response.json()["id"] get_response = await user_api.get_user_by_id(user_id) assert get_response.status_code == 200 await user_api.delete_user(user_id)