""" 异常场景测试用例 """ import pytest import time from api.user_api import UserAPI from api.role_api import RoleAPI from api.notice_api import SysNoticeAPI @pytest.mark.exception @pytest.mark.regression class TestExceptionScenarios: """异常场景测试类""" @pytest.mark.asyncio async def test_create_user_with_duplicate_username(self, authenticated_client, test_user_data, cleanup_user): """测试创建重复用户名的用户""" user_api = UserAPI(authenticated_client) create_response = await user_api.create_user(test_user_data) assert create_response.status_code == 201 user_id = create_response.json()["id"] cleanup_user.append(user_id) duplicate_response = await user_api.create_user(test_user_data) assert duplicate_response.status_code in [400, 409] @pytest.mark.asyncio async def test_create_user_with_invalid_email(self, authenticated_client): """测试创建邮箱格式无效的用户""" user_api = UserAPI(authenticated_client) invalid_emails = [ "invalid-email", "@example.com", "user@", "user@domain", "user name@example.com" ] for invalid_email in invalid_emails: timestamp = int(time.time() * 1000) user_data = { "username": f"test_{timestamp}", "password": "Test123!@#", "email": invalid_email, "status": 1 } response = await user_api.create_user(user_data) assert response.status_code in [400, 422] @pytest.mark.asyncio async def test_create_user_with_weak_password(self, authenticated_client): """测试创建弱密码用户""" user_api = UserAPI(authenticated_client) weak_passwords = [ "123456", "password", "qwerty", "111111", "abc123" ] for weak_password in weak_passwords: timestamp = int(time.time() * 1000) user_data = { "username": f"test_{timestamp}", "password": weak_password, "email": f"test_{timestamp}@example.com", "status": 1 } response = await user_api.create_user(user_data) assert response.status_code in [400, 422] @pytest.mark.asyncio async def test_create_user_with_missing_fields(self, authenticated_client): """测试创建缺少必填字段的用户""" user_api = UserAPI(authenticated_client) missing_field_scenarios = [ {"password": "Test123!@#", "email": "test@example.com"}, {"username": "testuser", "email": "test@example.com"}, {"username": "testuser", "password": "Test123!@#"} ] for scenario in missing_field_scenarios: response = await user_api.create_user(scenario) assert response.status_code in [400, 422] @pytest.mark.asyncio async def test_update_nonexistent_user(self, authenticated_client): """测试更新不存在的用户""" user_api = UserAPI(authenticated_client) update_data = {"email": "updated@example.com"} response = await user_api.update_user(999999, update_data) assert response.status_code == 404 @pytest.mark.asyncio async def test_delete_nonexistent_user(self, authenticated_client): """测试删除不存在的用户""" user_api = UserAPI(authenticated_client) response = await user_api.delete_user(999999) assert response.status_code == 404 @pytest.mark.asyncio async def test_create_role_with_duplicate_key(self, authenticated_client, test_role_data, cleanup_role): """测试创建重复角色键的角色""" role_api = RoleAPI(authenticated_client) create_response = await role_api.create_role(test_role_data) assert create_response.status_code == 201 role_id = create_response.json()["id"] cleanup_role.append(role_id) duplicate_response = await role_api.create_role(test_role_data) assert duplicate_response.status_code in [400, 409] @pytest.mark.asyncio async def test_create_role_with_invalid_status(self, authenticated_client): """测试创建状态无效的角色""" role_api = RoleAPI(authenticated_client) invalid_statuses = ["2", "3", "invalid", "true", "false"] for invalid_status in invalid_statuses: timestamp = int(time.time() * 1000) role_data = { "roleName": f"TestRole_{timestamp}", "roleKey": f"test_role_{timestamp}", "roleSort": 1, "status": invalid_status } response = await role_api.create_role(role_data) assert response.status_code in [400, 422] @pytest.mark.asyncio async def test_update_nonexistent_role(self, authenticated_client): """测试更新不存在的角色""" role_api = RoleAPI(authenticated_client) update_data = {"roleName": "Updated Role"} response = await role_api.update_role(999999, update_data) assert response.status_code == 404 @pytest.mark.asyncio async def test_create_notice_with_invalid_type(self, authenticated_client): """测试创建类型无效的公告""" notice_api = SysNoticeAPI(authenticated_client) invalid_types = ["3", "4", "invalid", "true", "false"] for invalid_type in invalid_types: timestamp = int(time.time() * 1000) notice_data = { "noticeTitle": f"TestNotice_{timestamp}", "noticeType": invalid_type, "noticeContent": "Test content", "status": "0" } response = await notice_api.create(notice_data) assert response.status_code in [400, 422] @pytest.mark.asyncio async def test_create_notice_with_empty_content(self, authenticated_client): """测试创建内容为空的公告""" notice_api = SysNoticeAPI(authenticated_client) empty_content_scenarios = [ {"noticeTitle": "Test", "noticeType": "1", "noticeContent": "", "status": "0"}, {"noticeTitle": "", "noticeType": "1", "noticeContent": "Test", "status": "0"}, {"noticeTitle": "Test", "noticeType": "1", "noticeContent": " ", "status": "0"} ] for scenario in empty_content_scenarios: response = await notice_api.create(scenario) assert response.status_code in [400, 422] @pytest.mark.asyncio async def test_update_nonexistent_notice(self, authenticated_client): """测试更新不存在的公告""" notice_api = SysNoticeAPI(authenticated_client) update_data = {"noticeTitle": "Updated Notice"} response = await notice_api.update(999999, update_data) assert response.status_code == 404 @pytest.mark.asyncio async def test_delete_nonexistent_notice(self, authenticated_client): """测试删除不存在的公告""" notice_api = SysNoticeAPI(authenticated_client) response = await notice_api.delete(999999) assert response.status_code == 404 @pytest.mark.asyncio async def test_get_user_with_invalid_id(self, authenticated_client): """测试获取ID无效的用户""" user_api = UserAPI(authenticated_client) invalid_ids = [-1, 0, "abc", "1.5", "999999999999"] for invalid_id in invalid_ids: try: response = await user_api.get_user_by_id(int(invalid_id) if isinstance(invalid_id, (int, str)) else invalid_id) assert response.status_code in [400, 404] except (ValueError, TypeError): pass @pytest.mark.asyncio async def test_pagination_with_invalid_params(self, authenticated_client): """测试分页参数无效的查询""" user_api = UserAPI(authenticated_client) invalid_params = [ {"page": -1, "size": 10}, {"page": 0, "size": -1}, {"page": 0, "size": 0}, {"page": 0, "size": 10000}, {"page": "abc", "size": 10}, {"page": 0, "size": "abc"} ] for params in invalid_params: try: response = await user_api.get_users_by_page(**params) assert response.status_code in [400, 422] except Exception: pass @pytest.mark.asyncio async def test_search_with_special_characters(self, authenticated_client): """测试搜索特殊字符""" user_api = UserAPI(authenticated_client) special_chars = [ "", "'; DROP TABLE users; --", "../../../etc/passwd", "{{7*7}}", "%00%00%00%00" ] for search_term in special_chars: response = await user_api.get_users_by_page(keyword=search_term) assert response.status_code in [200, 400] if response.status_code == 200: data = response.json() assert "content" in data for user in data["content"]: assert search_term.lower() not in str(user).lower() @pytest.mark.asyncio async def test_concurrent_same_resource_update(self, authenticated_client, test_user_data, cleanup_user): """测试并发更新同一资源""" user_api = UserAPI(authenticated_client) create_response = await user_api.create_user(test_user_data) assert create_response.status_code == 201 user_id = create_response.json()["id"] cleanup_user.append(user_id) import asyncio update_tasks = [ user_api.update_user(user_id, {"email": f"concurrent1_{time.time()}@example.com"}), user_api.update_user(user_id, {"email": f"concurrent2_{time.time()}@example.com"}), user_api.update_user(user_id, {"email": f"concurrent3_{time.time()}@example.com"}) ] results = await asyncio.gather(*update_tasks, return_exceptions=True) successful_updates = sum(1 for r in results if r.status_code == 200) assert successful_updates >= 1, "至少应该有一个更新成功" @pytest.mark.asyncio async def test_large_payload_handling(self, authenticated_client): """测试大数据负载处理""" user_api = UserAPI(authenticated_client) large_content = "x" * 10000 user_data = { "username": f"large_payload_{int(time.time() * 1000)}", "password": "Test123!@#", "email": f"large_{int(time.time() * 1000)}@example.com", "phone": large_content } response = await user_api.create_user(user_data) assert response.status_code in [201, 400, 413] if response.status_code in [400, 413]: logger.info("系统正确拒绝了过大的负载") @pytest.mark.asyncio async def test_unauthorized_access(self, http_client): """测试未授权访问""" user_api = UserAPI(http_client) response = await user_api.get_all_users() assert response.status_code == 401 @pytest.mark.asyncio async def test_rate_limiting(self, authenticated_client): """测试速率限制""" user_api = UserAPI(authenticated_client) requests_made = 0 rate_limit_hit = False for i in range(100): response = await user_api.get_all_users() requests_made += 1 if response.status_code == 429: rate_limit_hit = True logger.info(f"速率限制在第 {requests_made} 个请求时触发") break if rate_limit_hit: logger.info("系统正确实施了速率限制") else: logger.info("未触发速率限制(可能未配置或阈值较高)")