""" E2E完整业务流程测试套件 测试范围: 1. 用户管理完整生命周期 2. 角色权限配置流程 3. 菜单权限配置流程 4. 文件上传下载流程 5. 系统配置管理流程 作者: 张翔 日期: 2026-04-01 """ import pytest import time import uuid from api.auth_api import AuthAPI from api.user_api import UserAPI from api.role_api import RoleAPI from api.menu_api import MenuAPI from api.file_api import FileAPI from api.config_api import ConfigAPI @pytest.mark.e2e @pytest.mark.asyncio class TestE2ECompleteWorkflows: """E2E完整业务流程测试类""" async def test_e2e_complete_user_lifecycle( self, authenticated_client, test_data_manager ): """ E2E-WF-01: 用户管理完整生命周期流程 测试场景: 1. 创建新用户 2. 分配角色 3. 用户登录验证 4. 用户信息更新 5. 用户状态切换 6. 用户删除 """ user_api = UserAPI(authenticated_client) role_api = RoleAPI(authenticated_client) auth_api = AuthAPI(authenticated_client) unique_id = f"e2e_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" roles_response = await role_api.get_roles_by_page(size=1) assert roles_response.status_code == 200 roles = roles_response.json().get("content", []) role_id = roles[0]["id"] if roles else None user_data = { "username": f"lifecycle_user_{unique_id}", "password": "Test123!@#", "email": f"lifecycle_{unique_id}@test.com", "phone": "13800138000", "nickname": "生命周期测试用户", "status": 1, "roleId": role_id } create_response = await user_api.create_user(user_data) assert create_response.status_code in [201, 200], "创建用户失败" user_id = create_response.json().get("id") test_data_manager.add_user(user_id) login_response = await auth_api.login( user_data["username"], user_data["password"] ) assert login_response.status_code == 200, "新用户登录失败" update_data = { "nickname": "已更新昵称", "email": f"updated_{unique_id}@test.com" } update_response = await user_api.update_user(user_id, update_data) assert update_response.status_code == 200, "更新用户信息失败" status_response = await user_api.update_user( user_id, {"status": 0} ) assert status_response.status_code == 200, "用户状态切换失败" delete_response = await user_api.delete_user(user_id) assert delete_response.status_code in [200, 204], "删除用户失败" async def test_e2e_role_permission_workflow( self, authenticated_client, test_data_manager ): """ E2E-WF-02: 角色权限配置完整流程 测试场景: 1. 创建新角色 2. 配置菜单权限 3. 配置API权限 4. 分配给用户 5. 验证权限生效 """ role_api = RoleAPI(authenticated_client) menu_api = MenuAPI(authenticated_client) user_api = UserAPI(authenticated_client) unique_id = f"role_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" role_data = { "roleName": f"测试角色_{unique_id}", "roleKey": f"test_role_{unique_id}", "roleSort": 999, "status": 1, "remark": "E2E测试角色" } create_response = await role_api.create_role(role_data) assert create_response.status_code in [201, 200], "创建角色失败" role_id = create_response.json().get("id") test_data_manager.add_role(role_id) menus_response = await menu_api.get_menus() assert menus_response.status_code == 200 menus = menus_response.json() if isinstance( menus_response.json(), list ) else menus_response.json().get("data", []) if menus: menu_ids = [m["id"] for m in menus[:3]] permission_data = {"menuIds": menu_ids} perm_response = await role_api.assign_permissions( role_id, permission_data ) assert perm_response.status_code == 200, "分配权限失败" users_response = await user_api.get_users_by_page(size=1) users = users_response.json().get("content", []) if users: user_id = users[0]["id"] assign_response = await user_api.assign_roles( user_id, [role_id] ) assert assign_response.status_code == 200, "分配角色失败" async def test_e2e_file_management_workflow( self, authenticated_client, test_data_manager ): """ E2E-WF-03: 文件管理完整流程 测试场景: 1. 上传文件 2. 查询文件列表 3. 下载文件 4. 删除文件 """ file_api = FileAPI(authenticated_client) test_file_content = b"E2E test file content" test_filename = f"test_file_{int(time.time())}.txt" try: upload_response = await file_api.upload_file( file_content=test_file_content, filename=test_filename ) if upload_response.status_code in [201, 200]: file_id = upload_response.json().get("id") test_data_manager.add_file(file_id) list_response = await file_api.get_files_by_page() assert list_response.status_code == 200, "查询文件列表失败" download_response = await file_api.download_file(file_id) assert download_response.status_code == 200, "下载文件失败" delete_response = await file_api.delete_file(file_id) assert delete_response.status_code in [200, 204], "删除文件失败" else: pytest.skip("文件上传功能不可用") except Exception as e: pytest.skip(f"文件管理测试跳过: {str(e)}") async def test_e2e_system_config_workflow( self, authenticated_client, test_data_manager ): """ E2E-WF-04: 系统配置管理流程 测试场景: 1. 创建配置项 2. 查询配置 3. 更新配置 4. 删除配置 """ config_api = ConfigAPI(authenticated_client) unique_id = f"config_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" config_data = { "configKey": f"test_config_{unique_id}", "configValue": "test_value", "configName": "测试配置项", "remark": "E2E测试配置" } try: create_response = await config_api.create_config(config_data) if create_response.status_code in [201, 200]: config_id = create_response.json().get("id") get_response = await config_api.get_config_by_key( config_data["configKey"] ) assert get_response.status_code == 200, "查询配置失败" update_data = { "configValue": "updated_value" } update_response = await config_api.update_config( config_id, update_data ) assert update_response.status_code == 200, "更新配置失败" delete_response = await config_api.delete_config(config_id) assert delete_response.status_code in [200, 204], "删除配置失败" else: pytest.skip("系统配置功能不可用") except Exception as e: pytest.skip(f"系统配置测试跳过: {str(e)}") async def test_e2e_dictionary_management_workflow( self, authenticated_client, test_data_manager ): """ E2E-WF-05: 字典管理完整流程 测试场景: 1. 创建字典类型 2. 创建字典数据 3. 查询字典 4. 更新字典 5. 删除字典 """ from api.dict_api import DictAPI dict_api = DictAPI(authenticated_client) unique_id = f"dict_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" dict_type_data = { "dictName": f"测试字典类型_{unique_id}", "dictType": f"test_dict_{unique_id}", "status": 1, "remark": "E2E测试字典" } try: create_type_response = await dict_api.create_dict_type(dict_type_data) if create_type_response.status_code in [201, 200]: dict_type_id = create_type_response.json().get("id") test_data_manager.add_dict_type(dict_type_id) dict_data = { "dictType": dict_type_data["dictType"], "dictLabel": "测试数据", "dictValue": "test_value", "dictSort": 1, "status": 1 } create_data_response = await dict_api.create_dict_data(dict_data) if create_data_response.status_code in [201, 200]: dict_data_id = create_data_response.json().get("id") get_response = await dict_api.get_dict_by_type( dict_type_data["dictType"] ) assert get_response.status_code == 200, "查询字典失败" await dict_api.delete_dict_data(dict_data_id) await dict_api.delete_dict_type(dict_type_id) else: pytest.skip("字典管理功能不可用") except Exception as e: pytest.skip(f"字典管理测试跳过: {str(e)}") async def test_e2e_audit_log_workflow( self, authenticated_client, test_data_manager ): """ E2E-WF-06: 审计日志查询流程 测试场景: 1. 执行操作生成日志 2. 查询操作日志 3. 查询登录日志 4. 查询异常日志 """ from api.audit_api import AuditAPI audit_api = AuditAPI(authenticated_client) user_api = UserAPI(authenticated_client) unique_id = f"audit_{int(time.time() * 1000)}" user_data = { "username": f"audit_test_{unique_id}", "password": "Test123!@#", "email": f"audit_{unique_id}@test.com", "phone": "13800138000", "status": 1 } create_response = await user_api.create_user(user_data) if create_response.status_code in [201, 200]: user_id = create_response.json().get("id") test_data_manager.add_user(user_id) await user_api.delete_user(user_id) operation_logs = await audit_api.get_operation_logs( page=0, size=10 ) assert operation_logs.status_code == 200, "查询操作日志失败" login_logs = await audit_api.get_login_logs( page=0, size=10 ) assert login_logs.status_code == 200, "查询登录日志失败" else: pytest.skip("审计日志功能不可用")