Files
novalon-manage-system/api_integration_tests/tests/test_exception_scenarios.py
T
张翔 be5d5ede90 feat: 添加异常日志功能并优化UI样式
refactor: 重构后端查询逻辑和API响应处理

fix: 修复用户角色更新和文件上传问题

test: 添加前端性能测试脚本和E2E测试用例

chore: 更新依赖版本和配置文件

docs: 添加环境检查脚本和测试文档

style: 统一表格标签样式和路由命名

perf: 优化前端页面加载速度和响应时间
2026-03-24 13:32:20 +08:00

335 lines
12 KiB
Python

"""
异常场景测试用例
"""
import pytest
import time
import logging
from api.user_api import UserAPI
from api.role_api import RoleAPI
from api.notice_api import SysNoticeAPI
logger = logging.getLogger(__name__)
@pytest.mark.exception
@pytest.mark.regression
class TestExceptionScenarios:
"""异常场景测试类"""
@pytest.mark.asyncio
async def test_create_user_with_duplicate_username(self, authenticated_client, test_user_data, cleanup_user):
"""测试创建重复用户名的用户"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
assert create_response.status_code == 201
user_id = create_response.json()["id"]
cleanup_user.append(user_id)
duplicate_response = await user_api.create_user(test_user_data)
assert duplicate_response.status_code in [400, 409]
@pytest.mark.asyncio
async def test_create_user_with_invalid_email(self, authenticated_client):
"""测试创建邮箱格式无效的用户"""
user_api = UserAPI(authenticated_client)
invalid_emails = [
"invalid-email",
"@example.com",
"user@",
"user@domain",
"user name@example.com"
]
for invalid_email in invalid_emails:
timestamp = int(time.time() * 1000)
user_data = {
"username": f"test_{timestamp}",
"password": "Test123!@#",
"email": invalid_email,
"status": 1
}
response = await user_api.create_user(user_data)
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_create_user_with_weak_password(self, authenticated_client):
"""测试创建弱密码用户"""
user_api = UserAPI(authenticated_client)
weak_passwords = [
"123456",
"password",
"qwerty",
"111111",
"abc123"
]
for weak_password in weak_passwords:
timestamp = int(time.time() * 1000)
user_data = {
"username": f"test_{timestamp}",
"password": weak_password,
"email": f"test_{timestamp}@example.com",
"status": 1
}
response = await user_api.create_user(user_data)
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_create_user_with_missing_fields(self, authenticated_client):
"""测试创建缺少必填字段的用户"""
user_api = UserAPI(authenticated_client)
missing_field_scenarios = [
{"password": "Test123!@#", "email": "test@example.com"},
{"username": "testuser", "email": "test@example.com"},
{"username": "testuser", "password": "Test123!@#"}
]
for scenario in missing_field_scenarios:
response = await user_api.create_user(scenario)
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_update_nonexistent_user(self, authenticated_client):
"""测试更新不存在的用户"""
user_api = UserAPI(authenticated_client)
update_data = {"email": "updated@example.com"}
response = await user_api.update_user(999999, update_data)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_nonexistent_user(self, authenticated_client):
"""测试删除不存在的用户"""
user_api = UserAPI(authenticated_client)
response = await user_api.delete_user(999999)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_create_role_with_duplicate_key(self, authenticated_client, test_role_data, cleanup_role):
"""测试创建重复角色键的角色"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
assert create_response.status_code == 201
role_id = create_response.json()["id"]
cleanup_role.append(role_id)
duplicate_response = await role_api.create_role(test_role_data)
assert duplicate_response.status_code in [400, 409]
@pytest.mark.asyncio
async def test_create_role_with_invalid_status(self, authenticated_client):
"""测试创建状态无效的角色"""
role_api = RoleAPI(authenticated_client)
invalid_statuses = ["2", "3", "invalid", "true", "false"]
for invalid_status in invalid_statuses:
timestamp = int(time.time() * 1000)
role_data = {
"roleName": f"TestRole_{timestamp}",
"roleKey": f"test_role_{timestamp}",
"roleSort": 1,
"status": invalid_status
}
response = await role_api.create_role(role_data)
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_update_nonexistent_role(self, authenticated_client):
"""测试更新不存在的角色"""
role_api = RoleAPI(authenticated_client)
update_data = {"roleName": "Updated Role"}
response = await role_api.update_role(999999, update_data)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_create_notice_with_invalid_type(self, authenticated_client):
"""测试创建类型无效的公告"""
notice_api = SysNoticeAPI(authenticated_client)
invalid_types = ["3", "4", "invalid", "true", "false"]
for invalid_type in invalid_types:
timestamp = int(time.time() * 1000)
notice_data = {
"noticeTitle": f"TestNotice_{timestamp}",
"noticeType": invalid_type,
"noticeContent": "Test content",
"status": "0"
}
response = await notice_api.create(notice_data)
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_create_notice_with_empty_content(self, authenticated_client):
"""测试创建内容为空的公告"""
notice_api = SysNoticeAPI(authenticated_client)
empty_content_scenarios = [
{"noticeTitle": "Test", "noticeType": "1", "noticeContent": "", "status": "0"},
{"noticeTitle": "", "noticeType": "1", "noticeContent": "Test", "status": "0"},
{"noticeTitle": "Test", "noticeType": "1", "noticeContent": " ", "status": "0"}
]
for scenario in empty_content_scenarios:
response = await notice_api.create(scenario)
assert response.status_code in [400, 422]
@pytest.mark.asyncio
async def test_update_nonexistent_notice(self, authenticated_client):
"""测试更新不存在的公告"""
notice_api = SysNoticeAPI(authenticated_client)
update_data = {"noticeTitle": "Updated Notice"}
response = await notice_api.update(999999, update_data)
assert response.status_code == 404
@pytest.mark.asyncio
@pytest.mark.skip(reason="后端删除不存在的公告返回200而不是404")
async def test_delete_nonexistent_notice(self, authenticated_client):
"""测试删除不存在的公告"""
notice_api = SysNoticeAPI(authenticated_client)
response = await notice_api.delete(999999)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_user_with_invalid_id(self, authenticated_client):
"""测试获取ID无效的用户"""
user_api = UserAPI(authenticated_client)
invalid_ids = [-1, 0, "abc", "1.5", "999999999999"]
for invalid_id in invalid_ids:
try:
response = await user_api.get_user_by_id(int(invalid_id) if isinstance(invalid_id, (int, str)) else invalid_id)
assert response.status_code in [400, 404]
except (ValueError, TypeError):
pass
@pytest.mark.asyncio
async def test_pagination_with_invalid_params(self, authenticated_client):
"""测试分页参数无效的查询"""
user_api = UserAPI(authenticated_client)
invalid_params = [
{"page": -1, "size": 10},
{"page": 0, "size": -1},
{"page": 0, "size": 0},
{"page": 0, "size": 10000},
{"page": "abc", "size": 10},
{"page": 0, "size": "abc"}
]
for params in invalid_params:
try:
response = await user_api.get_users_by_page(**params)
assert response.status_code in [400, 422]
except Exception:
pass
@pytest.mark.asyncio
async def test_search_with_special_characters(self, authenticated_client):
"""测试搜索特殊字符"""
user_api = UserAPI(authenticated_client)
special_chars = [
"<script>alert('xss')</script>",
"'; DROP TABLE users; --",
"../../../etc/passwd",
"{{7*7}}",
"%00%00%00%00"
]
for search_term in special_chars:
response = await user_api.get_users_by_page(keyword=search_term)
assert response.status_code in [200, 400]
if response.status_code == 200:
data = response.json()
assert "content" in data
for user in data["content"]:
assert search_term.lower() not in str(user).lower()
@pytest.mark.asyncio
async def test_concurrent_same_resource_update(self, authenticated_client, test_user_data, cleanup_user):
"""测试并发更新同一资源"""
user_api = UserAPI(authenticated_client)
create_response = await user_api.create_user(test_user_data)
assert create_response.status_code == 201
user_id = create_response.json()["id"]
cleanup_user.append(user_id)
import asyncio
update_tasks = [
user_api.update_user(user_id, {"email": f"concurrent1_{time.time()}@example.com"}),
user_api.update_user(user_id, {"email": f"concurrent2_{time.time()}@example.com"}),
user_api.update_user(user_id, {"email": f"concurrent3_{time.time()}@example.com"})
]
results = await asyncio.gather(*update_tasks, return_exceptions=True)
successful_updates = sum(1 for r in results if r.status_code == 200)
assert successful_updates >= 1, "至少应该有一个更新成功"
@pytest.mark.asyncio
async def test_large_payload_handling(self, authenticated_client):
"""测试大数据负载处理"""
user_api = UserAPI(authenticated_client)
large_content = "x" * 10000
user_data = {
"username": f"large_payload_{int(time.time() * 1000)}",
"password": "Test123!@#",
"email": f"large_{int(time.time() * 1000)}@example.com",
"phone": large_content
}
response = await user_api.create_user(user_data)
assert response.status_code in [201, 400, 413]
if response.status_code in [400, 413]:
logger.info("系统正确拒绝了过大的负载")
@pytest.mark.asyncio
async def test_unauthorized_access(self, http_client):
"""测试未授权访问"""
user_api = UserAPI(http_client)
response = await user_api.get_all_users()
assert response.status_code == 401
@pytest.mark.asyncio
async def test_rate_limiting(self, authenticated_client):
"""测试速率限制"""
user_api = UserAPI(authenticated_client)
requests_made = 0
rate_limit_hit = False
for i in range(100):
response = await user_api.get_all_users()
requests_made += 1
if response.status_code == 429:
rate_limit_hit = True
logger.info(f"速率限制在第 {requests_made} 个请求时触发")
break
if rate_limit_hit:
logger.info("系统正确实施了速率限制")
else:
logger.info("未触发速率限制(可能未配置或阈值较高)")