""" 灾难恢复测试用例 测试系统在灾难场景下的恢复能力 """ import pytest import asyncio import time from api.user_api import UserAPI from api.role_api import RoleAPI from api.notice_api import SysNoticeAPI @pytest.mark.disaster @pytest.mark.regression @pytest.mark.critical class TestDisasterRecovery: """灾难恢复测试类""" @pytest.mark.asyncio async def test_service_restart_recovery(self, authenticated_client, test_data_manager): """测试服务重启后的数据恢复""" user_api = UserAPI(authenticated_client) unique_id = f"{int(time.time() * 1000)}" # 创建测试用户 user_data = { "username": f"restart_user_{unique_id}", "password": "Test123!@#", "email": f"restart_{unique_id}@example.com", "status": 1 } create_response = await user_api.create_user(user_data) assert create_response.status_code == 201 user_id = create_response.json()["id"] test_data_manager.add_user(user_id) # 模拟服务重启:等待一段时间后重新验证数据 await asyncio.sleep(2) # 验证数据在服务重启后仍然存在 verify_response = await user_api.get_user_by_id(user_id) assert verify_response.status_code == 200 assert verify_response.json()["username"] == user_data["username"] assert verify_response.json()["email"] == user_data["email"] @pytest.mark.asyncio async def test_data_consistency_after_failure(self, authenticated_client, test_data_manager): """测试故障后的数据一致性""" user_api = UserAPI(authenticated_client) role_api = RoleAPI(authenticated_client) unique_id = f"{int(time.time() * 1000)}" # 创建角色 role_data = { "roleName": f"Failure_Role_{unique_id}", "roleKey": f"failure_role_{unique_id}", "roleSort": 1, "status": 1 } role_response = await role_api.create_role(role_data) role_id = role_response.json()["id"] test_data_manager.add_role(role_id) # 创建用户并分配角色 user_data = { "username": f"failure_user_{unique_id}", "password": "Test123!@#", "email": f"failure_{unique_id}@example.com", "roleId": role_id, "status": 1 } user_response = await user_api.create_user(user_data) user_id = user_response.json()["id"] test_data_manager.add_user(user_id) # 模拟故障:等待一段时间 await asyncio.sleep(1) # 验证数据一致性 user_verify = await user_api.get_user_by_id(user_id) assert user_verify.status_code == 200 role_verify = await role_api.get_role_by_id(role_id) assert role_verify.status_code == 200 # 验证用户和角色关系仍然正确 user_data_verify = user_verify.json() if "roleId" in user_data_verify and user_data_verify["roleId"]: assert user_data_verify["roleId"] == role_id @pytest.mark.asyncio async def test_system_recovery_after_connection_loss(self, authenticated_client, test_data_manager): """测试连接丢失后的系统恢复""" user_api = UserAPI(authenticated_client) unique_id = f"{int(time.time() * 1000)}" # 创建测试用户 user_data = { "username": f"connection_user_{unique_id}", "password": "Test123!@#", "email": f"connection_{unique_id}@example.com", "status": 1 } create_response = await user_api.create_user(user_data) assert create_response.status_code == 201 user_id = create_response.json()["id"] test_data_manager.add_user(user_id) # 模拟连接丢失:等待一段时间 await asyncio.sleep(2) # 模拟连接恢复:重新验证数据 verify_response = await user_api.get_user_by_id(user_id) assert verify_response.status_code == 200 assert verify_response.json()["username"] == user_data["username"] @pytest.mark.asyncio async def test_partial_data_recovery(self, authenticated_client, test_data_manager): """测试部分数据恢复""" user_api = UserAPI(authenticated_client) unique_id = f"{int(time.time() * 1000)}" # 创建多个测试用户 user_ids = [] for i in range(3): user_data = { "username": f"partial_user_{unique_id}_{i}", "password": "Test123!@#", "email": f"partial_{unique_id}_{i}@example.com", "status": 1 } create_response = await user_api.create_user(user_data) assert create_response.status_code == 201 user_id = create_response.json()["id"] user_ids.append(user_id) test_data_manager.add_user(user_id) # 模拟部分数据丢失:验证剩余数据 await asyncio.sleep(1) # 验证所有用户数据仍然存在 for user_id in user_ids: verify_response = await user_api.get_user_by_id(user_id) assert verify_response.status_code == 200