648851df92
- 添加E2E测试报告 - 添加UAT测试报告 - 添加测试计划文档 - 添加测试改进总结
1835 lines
59 KiB
Markdown
1835 lines
59 KiB
Markdown
# 测试用例补充实施计划
|
|
|
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
|
|
|
**Goal:** 根据测试覆盖度评估结论,系统化补充缺失的测试用例,提升整体测试覆盖率和质量
|
|
|
|
**Architecture:** 基于现有pytest和Playwright测试框架,采用TDD方法,按优先级分阶段实施测试用例补充
|
|
|
|
**Tech Stack:** pytest, Playwright, httpx, asyncio, Python 3.9+
|
|
|
|
---
|
|
|
|
## 📋 实施概述
|
|
|
|
基于测试覆盖度评估结果,当前系统存在以下主要问题:
|
|
- API测试覆盖率:13%(目标80%)
|
|
- 前端测试覆盖率:20%(目标80%)
|
|
- E2E测试通过率:14%(目标95%)
|
|
- 缺少关键业务流程、异常场景、性能测试等
|
|
|
|
本计划按优先级分4个阶段实施:
|
|
1. **P0阶段**:关键业务流程测试(1-2周)
|
|
2. **P1阶段**:异常场景测试(2-3周)
|
|
3. **P2阶段**:性能测试(1-2周)
|
|
4. **P3-P4阶段**:边界条件和安全测试(2-3周)
|
|
|
|
---
|
|
|
|
## 🎯 P0阶段:关键业务流程测试
|
|
|
|
### Task 1: 分布式事务一致性测试
|
|
|
|
**优先级**: P0
|
|
**预计时间**: 3-5天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_distributed_transaction.py`
|
|
- Modify: `api_integration_tests/conftest.py` (添加分布式事务相关fixtures)
|
|
- Test: `api_integration_tests/tests/test_distributed_transaction.py`
|
|
|
|
**Step 1: 创建分布式事务测试文件**
|
|
|
|
```python
|
|
"""
|
|
分布式事务一致性测试用例
|
|
测试跨模块业务操作的数据一致性
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import time
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
from api.notice_api import SysNoticeAPI
|
|
|
|
|
|
@pytest.mark.distributed
|
|
@pytest.mark.regression
|
|
@pytest.mark.critical
|
|
class TestDistributedTransaction:
|
|
"""分布式事务一致性测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_role_assignment_consistency(self, authenticated_client, test_data_manager):
|
|
"""测试用户角色分配的事务一致性"""
|
|
user_api = UserAPI(authenticated_client)
|
|
role_api = RoleAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建角色
|
|
role_data = {
|
|
"roleName": f"TX_Role_{unique_id}",
|
|
"roleKey": f"tx_role_{unique_id}",
|
|
"roleSort": 1,
|
|
"status": 1
|
|
}
|
|
|
|
role_response = await role_api.create_role(role_data)
|
|
assert role_response.status_code == 201
|
|
role_id = role_response.json()["id"]
|
|
test_data_manager.add_role(role_id)
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"tx_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"tx_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
user_response = await user_api.create_user(user_data)
|
|
assert user_response.status_code == 201
|
|
user_id = user_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 分配角色
|
|
assign_response = await user_api.update_user(user_id, {"roleId": role_id})
|
|
assert assign_response.status_code == 200
|
|
|
|
# 验证一致性
|
|
user_verify = await user_api.get_user_by_id(user_id)
|
|
assert user_verify.json()["roleId"] == role_id
|
|
|
|
role_verify = await role_api.get_role_by_id(role_id)
|
|
assert role_verify.status_code == 200
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multi_module_operation_consistency(self, authenticated_client, test_data_manager):
|
|
"""测试多模块操作的事务一致性"""
|
|
user_api = UserAPI(authenticated_client)
|
|
role_api = RoleAPI(authenticated_client)
|
|
notice_api = SysNoticeAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建角色
|
|
role_data = {
|
|
"roleName": f"Multi_Role_{unique_id}",
|
|
"roleKey": f"multi_role_{unique_id}",
|
|
"roleSort": 1,
|
|
"status": 1
|
|
}
|
|
role_response = await role_api.create_role(role_data)
|
|
role_id = role_response.json()["id"]
|
|
test_data_manager.add_role(role_id)
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"multi_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"multi_{unique_id}@example.com",
|
|
"roleId": role_id,
|
|
"status": 1
|
|
}
|
|
user_response = await user_api.create_user(user_data)
|
|
user_id = user_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 创建通知
|
|
notice_data = {
|
|
"noticeTitle": f"Multi_Notice_{unique_id}",
|
|
"noticeType": "1",
|
|
"noticeContent": f"用户 {user_data['username']} 已创建",
|
|
"status": "0"
|
|
}
|
|
notice_response = await notice_api.create(notice_data)
|
|
assert notice_response.status_code in [200, 201]
|
|
|
|
# 验证所有操作都成功
|
|
user_verify = await user_api.get_user_by_id(user_id)
|
|
assert user_verify.status_code == 200
|
|
|
|
role_verify = await role_api.get_role_by_id(role_id)
|
|
assert role_verify.status_code == 200
|
|
|
|
notices = await notice_api.get_all()
|
|
assert notices.status_code == 200
|
|
notice_list = notices.json()
|
|
assert any(n["noticeTitle"] == notice_data["noticeTitle"] for n in notice_list)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_transaction_rollback_on_failure(self, authenticated_client, test_data_manager):
|
|
"""测试失败时的事务回滚"""
|
|
user_api = UserAPI(authenticated_client)
|
|
role_api = RoleAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建角色
|
|
role_data = {
|
|
"roleName": f"Rollback_Role_{unique_id}",
|
|
"roleKey": f"rollback_role_{unique_id}",
|
|
"roleSort": 1,
|
|
"status": 1
|
|
}
|
|
role_response = await role_api.create_role(role_data)
|
|
role_id = role_response.json()["id"]
|
|
test_data_manager.add_role(role_id)
|
|
|
|
# 尝试创建无效用户(应该失败)
|
|
invalid_user_data = {
|
|
"username": "", # 无效用户名
|
|
"password": "Test123!@#",
|
|
"email": f"rollback_{unique_id}@example.com",
|
|
"roleId": role_id,
|
|
"status": 1
|
|
}
|
|
|
|
invalid_response = await user_api.create_user(invalid_user_data)
|
|
assert invalid_response.status_code in [400, 422]
|
|
|
|
# 验证角色仍然存在(不应该被回滚)
|
|
role_verify = await role_api.get_role_by_id(role_id)
|
|
assert role_verify.status_code == 200
|
|
```
|
|
|
|
**Step 2: 运行测试验证失败**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_distributed_transaction.py::TestDistributedTransaction::test_user_role_assignment_consistency -v
|
|
```
|
|
|
|
Expected: FAIL with "module not found" or "import error"
|
|
|
|
**Step 3: 添加必要的fixtures到conftest.py**
|
|
|
|
在 `api_integration_tests/conftest.py` 中添加:
|
|
|
|
```python
|
|
@pytest.fixture
|
|
async def cleanup_distributed_transaction(authenticated_client: AsyncClient):
|
|
"""分布式事务测试清理fixture"""
|
|
user_ids = []
|
|
role_ids = []
|
|
notice_ids = []
|
|
|
|
yield {"users": user_ids, "roles": role_ids, "notices": notice_ids}
|
|
|
|
# 清理通知
|
|
for notice_id in notice_ids:
|
|
try:
|
|
await authenticated_client.delete(f"/api/notices/{notice_id}")
|
|
except Exception:
|
|
pass
|
|
|
|
# 清理用户
|
|
for user_id in user_ids:
|
|
try:
|
|
await authenticated_client.delete(f"/api/users/{user_id}")
|
|
except Exception:
|
|
pass
|
|
|
|
# 清理角色
|
|
for role_id in role_ids:
|
|
try:
|
|
await authenticated_client.delete(f"/api/roles/{role_id}")
|
|
except Exception:
|
|
pass
|
|
```
|
|
|
|
**Step 4: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_distributed_transaction.py::TestDistributedTransaction::test_user_role_assignment_consistency -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 5: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_distributed_transaction.py api_integration_tests/conftest.py
|
|
git commit -m "test: add distributed transaction consistency tests (P0)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: 数据恢复和备份测试
|
|
|
|
**优先级**: P0
|
|
**预计时间**: 2-3天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_data_recovery.py`
|
|
- Test: `api_integration_tests/tests/test_data_recovery.py`
|
|
|
|
**Step 1: 创建数据恢复测试文件**
|
|
|
|
```python
|
|
"""
|
|
数据恢复和备份测试用例
|
|
测试数据备份、恢复、迁移功能
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
|
|
|
|
@pytest.mark.recovery
|
|
@pytest.mark.regression
|
|
@pytest.mark.critical
|
|
class TestDataRecovery:
|
|
"""数据恢复和备份测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logical_delete_and_recovery(self, authenticated_client, test_data_manager):
|
|
"""测试逻辑删除和数据恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"recovery_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"recovery_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 逻辑删除
|
|
delete_response = await user_api.logical_delete_user(user_id)
|
|
assert delete_response.status_code == 204
|
|
|
|
# 验证用户已被删除
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 404
|
|
|
|
# 恢复用户
|
|
restore_response = await user_api.restore_user(user_id)
|
|
assert restore_response.status_code == 204
|
|
|
|
# 验证用户已恢复
|
|
restored_response = await user_api.get_user_by_id(user_id)
|
|
assert restored_response.status_code == 200
|
|
assert restored_response.json()["username"] == user_data["username"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_batch_recovery(self, authenticated_client, test_data_manager):
|
|
"""测试批量数据恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
user_ids = []
|
|
|
|
# 创建多个用户
|
|
for i in range(5):
|
|
user_data = {
|
|
"username": f"batch_recovery_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"batch_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
user_ids.append(user_id)
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 批量删除
|
|
for user_id in user_ids:
|
|
delete_response = await user_api.logical_delete_user(user_id)
|
|
assert delete_response.status_code == 204
|
|
|
|
# 批量恢复
|
|
for user_id in user_ids:
|
|
restore_response = await user_api.restore_user(user_id)
|
|
assert restore_response.status_code == 204
|
|
|
|
# 验证所有用户都已恢复
|
|
for user_id in user_ids:
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_recovery_with_associated_data(self, authenticated_client, test_data_manager):
|
|
"""测试关联数据的恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
role_api = RoleAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建角色
|
|
role_data = {
|
|
"roleName": f"Recovery_Role_{unique_id}",
|
|
"roleKey": f"recovery_role_{unique_id}",
|
|
"roleSort": 1,
|
|
"status": 1
|
|
}
|
|
role_response = await role_api.create_role(role_data)
|
|
role_id = role_response.json()["id"]
|
|
test_data_manager.add_role(role_id)
|
|
|
|
# 创建用户并分配角色
|
|
user_data = {
|
|
"username": f"assoc_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"assoc_{unique_id}@example.com",
|
|
"roleId": role_id,
|
|
"status": 1
|
|
}
|
|
user_response = await user_api.create_user(user_data)
|
|
user_id = user_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 删除用户
|
|
await user_api.logical_delete_user(user_id)
|
|
|
|
# 恢复用户
|
|
await user_api.restore_user(user_id)
|
|
|
|
# 验证用户和角色关联仍然存在
|
|
user_verify = await user_api.get_user_by_id(user_id)
|
|
assert user_verify.status_code == 200
|
|
assert user_verify.json()["roleId"] == role_id
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_data_recovery.py::TestDataRecovery::test_logical_delete_and_recovery -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_data_recovery.py
|
|
git commit -m "test: add data recovery and backup tests (P0)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: 系统升级迁移测试
|
|
|
|
**优先级**: P0
|
|
**预计时间**: 3-4天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_system_migration.py`
|
|
- Test: `api_integration_tests/tests/test_system_migration.py`
|
|
|
|
**Step 1: 创建系统迁移测试文件**
|
|
|
|
```python
|
|
"""
|
|
系统升级迁移测试用例
|
|
测试版本升级时的数据迁移和兼容性
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
from api.config_api import ConfigAPI
|
|
|
|
|
|
@pytest.mark.migration
|
|
@pytest.mark.regression
|
|
@pytest.mark.critical
|
|
class TestSystemMigration:
|
|
"""系统升级迁移测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_data_schema_compatibility(self, authenticated_client, test_data_manager):
|
|
"""测试数据模式兼容性"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建包含所有字段的用户
|
|
user_data = {
|
|
"username": f"compat_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"compat_{unique_id}@example.com",
|
|
"phone": "13800138000",
|
|
"nickname": f"兼容性测试用户{unique_id}",
|
|
"roleId": 2,
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 验证所有字段都能正确读取
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
user_info = get_response.json()
|
|
|
|
assert user_info["username"] == user_data["username"]
|
|
assert user_info["email"] == user_data["email"]
|
|
assert user_info["phone"] == user_data["phone"]
|
|
assert user_info["nickname"] == user_data["nickname"]
|
|
assert user_info["roleId"] == user_data["roleId"]
|
|
assert user_info["status"] == user_data["status"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_migration(self, authenticated_client, test_data_manager):
|
|
"""测试配置迁移"""
|
|
config_api = ConfigAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建旧版本配置
|
|
old_config_data = {
|
|
"configName": f"old_config_{unique_id}",
|
|
"configKey": f"old.key.{unique_id}",
|
|
"configValue": "old_value",
|
|
"configType": "system",
|
|
"remark": "旧版本配置"
|
|
}
|
|
|
|
create_response = await config_api.create_config(old_config_data)
|
|
assert create_response.status_code == 201
|
|
config_id = create_response.json()["id"]
|
|
test_data_manager.add_config(config_id)
|
|
|
|
# 更新为新版本配置格式
|
|
new_config_data = {
|
|
"configValue": "new_value",
|
|
"configType": "business"
|
|
}
|
|
|
|
update_response = await config_api.update_config(config_id, new_config_data)
|
|
assert update_response.status_code == 200
|
|
|
|
# 验证配置已正确迁移
|
|
get_response = await config_api.get_config_by_id(config_id)
|
|
assert get_response.status_code == 200
|
|
config_info = get_response.json()
|
|
|
|
assert config_info["configValue"] == "new_value"
|
|
assert config_info["configType"] == "business"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_role_permission_migration(self, authenticated_client, test_data_manager):
|
|
"""测试角色权限迁移"""
|
|
role_api = RoleAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建旧版本角色
|
|
old_role_data = {
|
|
"roleName": f"old_role_{unique_id}",
|
|
"roleKey": f"old.role.{unique_id}",
|
|
"roleSort": 1,
|
|
"status": 1,
|
|
"remark": "旧版本角色"
|
|
}
|
|
|
|
create_response = await role_api.create_role(old_role_data)
|
|
assert create_response.status_code == 201
|
|
role_id = create_response.json()["id"]
|
|
test_data_manager.add_role(role_id)
|
|
|
|
# 迁移权限(模拟权限升级)
|
|
new_permissions = [
|
|
"system:user:view",
|
|
"system:user:add",
|
|
"system:user:edit",
|
|
"system:user:delete"
|
|
]
|
|
|
|
# 为角色分配新权限
|
|
for permission in new_permissions:
|
|
assign_response = await role_api.assign_permission(role_id, permission)
|
|
assert assign_response.status_code in [200, 201]
|
|
|
|
# 验证权限已正确迁移
|
|
role_verify = await role_api.get_role_by_id(role_id)
|
|
assert role_verify.status_code == 200
|
|
role_info = role_verify.json()
|
|
|
|
# 验证角色信息
|
|
assert role_info["roleName"] == old_role_data["roleName"]
|
|
assert role_info["status"] == old_role_data["status"]
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_system_migration.py::TestSystemMigration::test_data_schema_compatibility -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_system_migration.py
|
|
git commit -m "test: add system migration tests (P0)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: 灾难恢复测试
|
|
|
|
**优先级**: P0
|
|
**预计时间**: 2-3天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_disaster_recovery.py`
|
|
- Test: `api_integration_tests/tests/test_disaster_recovery.py`
|
|
|
|
**Step 1: 创建灾难恢复测试文件**
|
|
|
|
```python
|
|
"""
|
|
灾难恢复测试用例
|
|
测试系统在灾难场景下的恢复能力
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import asyncio
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
|
|
|
|
@pytest.mark.disaster
|
|
@pytest.mark.regression
|
|
@pytest.mark.critical
|
|
class TestDisasterRecovery:
|
|
"""灾难恢复测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_restart_recovery(self, authenticated_client, test_data_manager):
|
|
"""测试服务重启后的数据恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"restart_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"restart_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 模拟服务重启(等待一段时间)
|
|
await asyncio.sleep(2)
|
|
|
|
# 服务重启后验证数据仍然存在
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
assert get_response.json()["username"] == user_data["username"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_pool_recovery(self, authenticated_client, test_data_manager):
|
|
"""测试连接池恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"pool_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"pool_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 执行多次请求以测试连接池
|
|
for i in range(10):
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
|
|
# 验证数据一致性
|
|
final_response = await user_api.get_user_by_id(user_id)
|
|
assert final_response.status_code == 200
|
|
assert final_response.json()["username"] == user_data["username"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_invalidation_recovery(self, authenticated_client, test_data_manager):
|
|
"""测试缓存失效后的恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"cache_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"cache_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 更新用户信息
|
|
update_data = {"email": f"updated_{unique_id}@example.com"}
|
|
update_response = await user_api.update_user(user_id, update_data)
|
|
assert update_response.status_code == 200
|
|
|
|
# 验证缓存已失效,获取到最新数据
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
assert get_response.json()["email"] == update_data["email"]
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_disaster_recovery.py::TestDisasterRecovery::test_service_restart_recovery -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_disaster_recovery.py
|
|
git commit -m "test: add disaster recovery tests (P0)"
|
|
```
|
|
|
|
---
|
|
|
|
## 🚨 P1阶段:异常场景测试
|
|
|
|
### Task 5: 网络中断恢复测试
|
|
|
|
**优先级**: P1
|
|
**预计时间**: 2-3天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_network_recovery.py`
|
|
- Test: `api_integration_tests/tests/test_network_recovery.py`
|
|
|
|
**Step 1: 创建网络恢复测试文件**
|
|
|
|
```python
|
|
"""
|
|
网络中断恢复测试用例
|
|
测试网络故障时的处理和恢复能力
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import asyncio
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
|
|
|
|
@pytest.mark.network
|
|
@pytest.mark.regression
|
|
class TestNetworkRecovery:
|
|
"""网络中断恢复测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_request_timeout_recovery(self, authenticated_client, test_data_manager):
|
|
"""测试请求超时后的恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"timeout_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"timeout_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
try:
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
except asyncio.TimeoutError:
|
|
# 超时后重试
|
|
await asyncio.sleep(1)
|
|
create_response = await user_api.create_user(user_data)
|
|
assert create_response.status_code == 201
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 验证用户创建成功
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_refused_recovery(self, authenticated_client, test_data_manager):
|
|
"""测试连接拒绝后的恢复"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 模拟连接问题
|
|
user_data = {
|
|
"username": f"conn_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"conn_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
create_response = await user_api.create_user(user_data)
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
break
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
raise
|
|
await asyncio.sleep(1)
|
|
|
|
# 验证用户创建成功
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_network_fluctuation_handling(self, authenticated_client, test_data_manager):
|
|
"""测试网络波动的处理"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
user_ids = []
|
|
|
|
# 模拟网络波动,创建多个用户
|
|
for i in range(5):
|
|
user_data = {
|
|
"username": f"fluctuation_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"fluctuation_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
try:
|
|
create_response = await user_api.create_user(user_data)
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
user_ids.append(user_id)
|
|
test_data_manager.add_user(user_id)
|
|
except Exception:
|
|
# 网络波动时跳过
|
|
await asyncio.sleep(0.5)
|
|
continue
|
|
|
|
# 验证至少部分用户创建成功
|
|
assert len(user_ids) > 0
|
|
|
|
# 验证所有创建的用户都能正常访问
|
|
for user_id in user_ids:
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_network_recovery.py::TestNetworkRecovery::test_request_timeout_recovery -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_network_recovery.py
|
|
git commit -m "test: add network recovery tests (P1)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: 数据库连接失败测试
|
|
|
|
**优先级**: P1
|
|
**预计时间**: 2-3天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_database_failure.py`
|
|
- Test: `api_integration_tests/tests/test_database_failure.py`
|
|
|
|
**Step 1: 创建数据库故障测试文件**
|
|
|
|
```python
|
|
"""
|
|
数据库连接失败测试用例
|
|
测试数据库故障时的处理和恢复能力
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import asyncio
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
|
|
|
|
@pytest.mark.database
|
|
@pytest.mark.regression
|
|
class TestDatabaseFailure:
|
|
"""数据库连接失败测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_database_connection_timeout(self, authenticated_client, test_data_manager):
|
|
"""测试数据库连接超时"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 尝试创建用户
|
|
user_data = {
|
|
"username": f"db_timeout_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"db_timeout_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
# 数据库超时后应该返回适当的错误
|
|
try:
|
|
create_response = await user_api.create_user(user_data)
|
|
# 如果成功,验证数据
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
except Exception as e:
|
|
# 验证错误处理
|
|
assert "timeout" in str(e).lower() or "connection" in str(e).lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_database_connection_pool_exhaustion(self, authenticated_client, test_data_manager):
|
|
"""测试数据库连接池耗尽"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 并发创建多个用户以测试连接池
|
|
tasks = []
|
|
for i in range(20):
|
|
user_data = {
|
|
"username": f"pool_exhaust_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"pool_exhaust_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
async def create_user():
|
|
try:
|
|
response = await user_api.create_user(user_data)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
except Exception:
|
|
return None
|
|
|
|
tasks.append(create_user())
|
|
|
|
# 执行并发请求
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# 验证至少部分请求成功
|
|
successful_ids = [r for r in results if r is not None and isinstance(r, int)]
|
|
assert len(successful_ids) > 0
|
|
|
|
# 清理
|
|
for user_id in successful_ids:
|
|
test_data_manager.add_user(user_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_database_transaction_rollback(self, authenticated_client, test_data_manager):
|
|
"""测试数据库事务回滚"""
|
|
user_api = UserAPI(authenticated_client)
|
|
role_api = RoleAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建角色
|
|
role_data = {
|
|
"roleName": f"TX_Rollback_Role_{unique_id}",
|
|
"roleKey": f"tx_rollback_role_{unique_id}",
|
|
"roleSort": 1,
|
|
"status": 1
|
|
}
|
|
|
|
role_response = await role_api.create_role(role_data)
|
|
if role_response.status_code == 201:
|
|
role_id = role_response.json()["id"]
|
|
test_data_manager.add_role(role_id)
|
|
|
|
# 尝试创建无效用户(应该触发事务回滚)
|
|
invalid_user_data = {
|
|
"username": "", # 无效用户名
|
|
"password": "Test123!@#",
|
|
"email": f"tx_rollback_{unique_id}@example.com",
|
|
"roleId": role_id,
|
|
"status": 1
|
|
}
|
|
|
|
user_response = await user_api.create_user(invalid_user_data)
|
|
assert user_response.status_code in [400, 422]
|
|
|
|
# 验证角色仍然存在(不应该被回滚)
|
|
role_verify = await role_api.get_role_by_id(role_id)
|
|
assert role_verify.status_code == 200
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_database_failure.py::TestDatabaseFailure::test_database_connection_timeout -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_database_failure.py
|
|
git commit -m "test: add database failure tests (P1)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 7: 服务降级测试
|
|
|
|
**优先级**: P1
|
|
**预计时间**: 2-3天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_service_degradation.py`
|
|
- Test: `api_integration_tests/tests/test_service_degradation.py`
|
|
|
|
**Step 1: 创建服务降级测试文件**
|
|
|
|
```python
|
|
"""
|
|
服务降级测试用例
|
|
测试部分服务不可用时的降级处理
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
from api.notice_api import SysNoticeAPI
|
|
|
|
|
|
@pytest.mark.degradation
|
|
@pytest.mark.regression
|
|
class TestServiceDegradation:
|
|
"""服务降级测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_only_mode(self, authenticated_client, test_data_manager):
|
|
"""测试只读模式"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 在只读模式下,读取操作应该正常
|
|
get_all_response = await user_api.get_all_users()
|
|
assert get_all_response.status_code == 200
|
|
|
|
# 写入操作应该被拒绝或降级
|
|
user_data = {
|
|
"username": f"readonly_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"readonly_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
# 只读模式下应该返回503或403
|
|
assert create_response.status_code in [200, 201, 403, 503]
|
|
|
|
# 如果创建成功,清理
|
|
if create_response.status_code in [200, 201]:
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_fallback(self, authenticated_client, test_data_manager):
|
|
"""测试缓存降级"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"cache_fallback_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"cache_fallback_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 第一次请求从数据库读取
|
|
get_response1 = await user_api.get_user_by_id(user_id)
|
|
assert get_response1.status_code == 200
|
|
|
|
# 第二次请求应该从缓存读取
|
|
get_response2 = await user_api.get_user_by_id(user_id)
|
|
assert get_response2.status_code == 200
|
|
|
|
# 验证数据一致性
|
|
assert get_response1.json() == get_response2.json()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_partial_service_unavailability(self, authenticated_client, test_data_manager):
|
|
"""测试部分服务不可用"""
|
|
user_api = UserAPI(authenticated_client)
|
|
notice_api = SysNoticeAPI(authenticated_client)
|
|
|
|
# 用户服务应该可用
|
|
get_users_response = await user_api.get_all_users()
|
|
assert get_users_response.status_code == 200
|
|
|
|
# 通知服务可能不可用
|
|
try:
|
|
get_notices_response = await notice_api.get_all()
|
|
# 如果可用,验证响应
|
|
if get_notices_response.status_code == 200:
|
|
assert isinstance(get_notices_response.json(), list)
|
|
except Exception:
|
|
# 通知服务不可用,系统应该继续运行
|
|
pass
|
|
|
|
# 验证核心功能仍然可用
|
|
get_users_response2 = await user_api.get_all_users()
|
|
assert get_users_response2.status_code == 200
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_service_degradation.py::TestServiceDegradation::test_read_only_mode -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_service_degradation.py
|
|
git commit -m "test: add service degradation tests (P1)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 8: 第三方服务超时测试
|
|
|
|
**优先级**: P1
|
|
**预计时间**: 2-3天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_third_party_timeout.py`
|
|
- Test: `api_integration_tests/tests/test_third_party_timeout.py`
|
|
|
|
**Step 1: 创建第三方服务超时测试文件**
|
|
|
|
```python
|
|
"""
|
|
第三方服务超时测试用例
|
|
测试第三方服务超时时的处理
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import asyncio
|
|
from api.user_api import UserAPI
|
|
from api.notice_api import SysNoticeAPI
|
|
|
|
|
|
@pytest.mark.timeout
|
|
@pytest.mark.regression
|
|
class TestThirdPartyTimeout:
|
|
"""第三方服务超时测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_external_api_timeout(self, authenticated_client, test_data_manager):
|
|
"""测试外部API超时"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建用户(可能涉及外部API调用)
|
|
user_data = {
|
|
"username": f"external_user_{unique_id}",
|
|
"password": "Test123!@#",
|
|
"email": f"external_{unique_id}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
try:
|
|
# 设置较短的超时时间
|
|
create_response = await asyncio.wait_for(
|
|
user_api.create_user(user_data),
|
|
timeout=5.0
|
|
)
|
|
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 验证用户创建成功
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
|
|
except asyncio.TimeoutError:
|
|
# 超时处理
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notification_service_timeout(self, authenticated_client, test_data_manager):
|
|
"""测试通知服务超时"""
|
|
notice_api = SysNoticeAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建通知
|
|
notice_data = {
|
|
"noticeTitle": f"Timeout_Notice_{unique_id}",
|
|
"noticeType": "1",
|
|
"noticeContent": "测试通知服务超时",
|
|
"status": "0"
|
|
}
|
|
|
|
try:
|
|
# 设置较短的超时时间
|
|
create_response = await asyncio.wait_for(
|
|
notice_api.create(notice_data),
|
|
timeout=5.0
|
|
)
|
|
|
|
if create_response.status_code in [200, 201]:
|
|
notice_id = create_response.json().get("id")
|
|
if notice_id:
|
|
test_data_manager.add_notice(notice_id)
|
|
|
|
# 验证通知创建成功
|
|
get_response = await notice_api.get_by_id(notice_id)
|
|
assert get_response.status_code == 200
|
|
|
|
except asyncio.TimeoutError:
|
|
# 超时处理
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cascading_timeout_prevention(self, authenticated_client, test_data_manager):
|
|
"""测试级联超时预防"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建多个用户,测试是否会出现级联超时
|
|
tasks = []
|
|
for i in range(5):
|
|
user_data = {
|
|
"username": f"cascading_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"cascading_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
async def create_user():
|
|
try:
|
|
response = await asyncio.wait_for(
|
|
user_api.create_user(user_data),
|
|
timeout=3.0
|
|
)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
except asyncio.TimeoutError:
|
|
return None
|
|
|
|
tasks.append(create_user())
|
|
|
|
# 执行并发请求
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# 验证至少部分请求成功
|
|
successful_ids = [r for r in results if r is not None and isinstance(r, int)]
|
|
assert len(successful_ids) > 0
|
|
|
|
# 清理
|
|
for user_id in successful_ids:
|
|
test_data_manager.add_user(user_id)
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_third_party_timeout.py::TestThirdPartyTimeout::test_external_api_timeout -v
|
|
```
|
|
|
|
Expected: PASS
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_third_party_timeout.py
|
|
git commit -m "test: add third party timeout tests (P1)"
|
|
```
|
|
|
|
---
|
|
|
|
## ⚡ P2阶段:性能测试
|
|
|
|
### Task 9: 高并发压力测试
|
|
|
|
**优先级**: P2
|
|
**预计时间**: 3-4天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_high_concurrency.py`
|
|
- Test: `api_integration_tests/tests/test_high_concurrency.py`
|
|
|
|
**Step 1: 创建高并发测试文件**
|
|
|
|
```python
|
|
"""
|
|
高并发压力测试用例
|
|
测试系统在高并发情况下的性能和稳定性
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import asyncio
|
|
from statistics import mean, median
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
|
|
|
|
@pytest.mark.performance
|
|
@pytest.mark.concurrency
|
|
class TestHighConcurrency:
|
|
"""高并发压力测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_user_creation(self, authenticated_client, test_data_manager):
|
|
"""测试并发创建用户"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
concurrent_count = 50
|
|
|
|
# 并发创建用户
|
|
tasks = []
|
|
start_time = time.time()
|
|
|
|
for i in range(concurrent_count):
|
|
user_data = {
|
|
"username": f"concurrent_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"concurrent_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
async def create_user():
|
|
try:
|
|
response = await user_api.create_user(user_data)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
except Exception as e:
|
|
print(f"创建用户失败: {e}")
|
|
return None
|
|
|
|
tasks.append(create_user())
|
|
|
|
# 执行并发请求
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
end_time = time.time()
|
|
|
|
# 统计结果
|
|
successful_ids = [r for r in results if r is not None and isinstance(r, int)]
|
|
failed_count = concurrent_count - len(successful_ids)
|
|
total_time = end_time - start_time
|
|
avg_time = total_time / concurrent_count
|
|
|
|
print(f"并发创建用户统计:")
|
|
print(f" 总请求数: {concurrent_count}")
|
|
print(f" 成功数: {len(successful_ids)}")
|
|
print(f" 失败数: {failed_count}")
|
|
print(f" 总耗时: {total_time:.2f}秒")
|
|
print(f" 平均耗时: {avg_time:.2f}秒")
|
|
print(f" 吞吐量: {concurrent_count/total_time:.2f} 请求/秒")
|
|
|
|
# 验证至少90%的请求成功
|
|
success_rate = len(successful_ids) / concurrent_count
|
|
assert success_rate >= 0.9, f"成功率过低: {success_rate:.2%}"
|
|
|
|
# 清理
|
|
for user_id in successful_ids:
|
|
test_data_manager.add_user(user_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_user_queries(self, authenticated_client, test_data_manager):
|
|
"""测试并发查询用户"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 先创建一些用户
|
|
user_ids = []
|
|
for i in range(10):
|
|
user_data = {
|
|
"username": f"query_user_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"query_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
user_ids.append(user_id)
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 并发查询用户
|
|
tasks = []
|
|
start_time = time.time()
|
|
|
|
for user_id in user_ids:
|
|
async def query_user(uid):
|
|
try:
|
|
response = await user_api.get_user_by_id(uid)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
except Exception as e:
|
|
print(f"查询用户失败: {e}")
|
|
return None
|
|
|
|
tasks.append(query_user(user_id))
|
|
|
|
# 执行并发查询
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
end_time = time.time()
|
|
|
|
# 统计结果
|
|
successful_results = [r for r in results if r is not None]
|
|
failed_count = len(user_ids) - len(successful_results)
|
|
total_time = end_time - start_time
|
|
avg_time = total_time / len(user_ids)
|
|
|
|
print(f"并发查询用户统计:")
|
|
print(f" 总查询数: {len(user_ids)}")
|
|
print(f" 成功数: {len(successful_results)}")
|
|
print(f" 失败数: {failed_count}")
|
|
print(f" 总耗时: {total_time:.2f}秒")
|
|
print(f" 平均耗时: {avg_time:.2f}秒")
|
|
print(f" 吞吐量: {len(user_ids)/total_time:.2f} 查询/秒")
|
|
|
|
# 验证所有查询都成功
|
|
assert len(successful_results) == len(user_ids), "部分查询失败"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_mixed_operations(self, authenticated_client, test_data_manager):
|
|
"""测试并发混合操作"""
|
|
user_api = UserAPI(authenticated_client)
|
|
role_api = RoleAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
|
|
# 创建一些初始数据
|
|
user_ids = []
|
|
for i in range(5):
|
|
user_data = {
|
|
"username": f"mixed_user_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"mixed_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
user_ids.append(user_id)
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 并发执行混合操作
|
|
tasks = []
|
|
start_time = time.time()
|
|
|
|
# 创建操作
|
|
for i in range(10):
|
|
user_data = {
|
|
"username": f"mixed_create_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"mixed_create_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
async def create_user():
|
|
try:
|
|
response = await user_api.create_user(user_data)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
except Exception:
|
|
return None
|
|
|
|
tasks.append(create_user())
|
|
|
|
# 查询操作
|
|
for user_id in user_ids:
|
|
async def query_user(uid):
|
|
try:
|
|
response = await user_api.get_user_by_id(uid)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
except Exception:
|
|
return None
|
|
|
|
tasks.append(query_user(user_id))
|
|
|
|
# 更新操作
|
|
for i, user_id in enumerate(user_ids):
|
|
async def update_user(uid):
|
|
try:
|
|
response = await user_api.update_user(
|
|
uid,
|
|
{"email": f"updated_{unique_id}_{i}@example.com"}
|
|
)
|
|
return response.status_code
|
|
except Exception:
|
|
return None
|
|
|
|
tasks.append(update_user(user_id))
|
|
|
|
# 执行并发操作
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
end_time = time.time()
|
|
|
|
# 统计结果
|
|
successful_results = [r for r in results if r is not None]
|
|
failed_count = len(tasks) - len(successful_results)
|
|
total_time = end_time - start_time
|
|
avg_time = total_time / len(tasks)
|
|
|
|
print(f"并发混合操作统计:")
|
|
print(f" 总操作数: {len(tasks)}")
|
|
print(f" 成功数: {len(successful_results)}")
|
|
print(f" 失败数: {failed_count}")
|
|
print(f" 总耗时: {total_time:.2f}秒")
|
|
print(f" 平均耗时: {avg_time:.2f}秒")
|
|
print(f" 吞吐量: {len(tasks)/total_time:.2f} 操作/秒")
|
|
|
|
# 验证至少90%的操作成功
|
|
success_rate = len(successful_results) / len(tasks)
|
|
assert success_rate >= 0.9, f"成功率过低: {success_rate:.2%}"
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_high_concurrency.py::TestHighConcurrency::test_concurrent_user_creation -v -s
|
|
```
|
|
|
|
Expected: PASS with performance metrics
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_high_concurrency.py
|
|
git commit -m "test: add high concurrency performance tests (P2)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 10: 长时间稳定性测试
|
|
|
|
**优先级**: P2
|
|
**预计时间**: 2-3天
|
|
|
|
**Files:**
|
|
- Create: `api_integration_tests/tests/test_long_term_stability.py`
|
|
- Test: `api_integration_tests/tests/test_long_term_stability.py`
|
|
|
|
**Step 1: 创建长时间稳定性测试文件**
|
|
|
|
```python
|
|
"""
|
|
长时间稳定性测试用例
|
|
测试系统在长时间运行下的稳定性
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import asyncio
|
|
from statistics import mean, median
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
|
|
|
|
@pytest.mark.performance
|
|
@pytest.mark.stability
|
|
class TestLongTermStability:
|
|
"""长时间稳定性测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extended_operation_stability(self, authenticated_client, test_data_manager):
|
|
"""测试长时间操作稳定性"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
duration = 300 # 5分钟
|
|
interval = 10 # 每10秒执行一次操作
|
|
|
|
start_time = time.time()
|
|
operation_count = 0
|
|
success_count = 0
|
|
failure_count = 0
|
|
response_times = []
|
|
|
|
while (time.time() - start_time) < duration:
|
|
operation_start = time.time()
|
|
|
|
# 执行用户查询操作
|
|
try:
|
|
response = await user_api.get_all_users()
|
|
operation_end = time.time()
|
|
|
|
if response.status_code == 200:
|
|
success_count += 1
|
|
response_times.append(operation_end - operation_start)
|
|
else:
|
|
failure_count += 1
|
|
|
|
except Exception as e:
|
|
failure_count += 1
|
|
print(f"操作失败: {e}")
|
|
|
|
operation_count += 1
|
|
await asyncio.sleep(interval)
|
|
|
|
# 统计结果
|
|
total_time = time.time() - start_time
|
|
avg_response_time = mean(response_times) if response_times else 0
|
|
max_response_time = max(response_times) if response_times else 0
|
|
min_response_time = min(response_times) if response_times else 0
|
|
|
|
print(f"长时间稳定性测试统计:")
|
|
print(f" 测试时长: {total_time:.2f}秒")
|
|
print(f" 总操作数: {operation_count}")
|
|
print(f" 成功数: {success_count}")
|
|
print(f" 失败数: {failure_count}")
|
|
print(f" 成功率: {success_count/operation_count:.2%}")
|
|
print(f" 平均响应时间: {avg_response_time:.3f}秒")
|
|
print(f" 最大响应时间: {max_response_time:.3f}秒")
|
|
print(f" 最小响应时间: {min_response_time:.3f}秒")
|
|
print(f" 操作频率: {operation_count/total_time:.2f} 操作/秒")
|
|
|
|
# 验证稳定性
|
|
success_rate = success_count / operation_count
|
|
assert success_rate >= 0.95, f"成功率过低: {success_rate:.2%}"
|
|
assert avg_response_time < 2.0, f"平均响应时间过长: {avg_response_time:.3f}秒"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_memory_leak_detection(self, authenticated_client, test_data_manager):
|
|
"""测试内存泄漏检测"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
iterations = 100
|
|
|
|
# 记录初始状态
|
|
initial_users = await user_api.get_all_users()
|
|
initial_count = len(initial_users.json())
|
|
|
|
# 执行多次操作
|
|
user_ids = []
|
|
for i in range(iterations):
|
|
user_data = {
|
|
"username": f"memory_test_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"memory_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
user_ids.append(user_id)
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 每隔10次检查一次状态
|
|
if (i + 1) % 10 == 0:
|
|
current_users = await user_api.get_all_users()
|
|
current_count = len(current_users.json())
|
|
expected_count = initial_count + len(user_ids)
|
|
|
|
print(f"迭代 {i+1}/{iterations}: 用户数 {current_count}, 预期 {expected_count}")
|
|
|
|
# 验证数据一致性
|
|
assert current_count == expected_count, f"数据不一致: {current_count} != {expected_count}"
|
|
|
|
# 清理
|
|
for user_id in user_ids:
|
|
try:
|
|
await user_api.delete_user(user_id)
|
|
except Exception:
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_pool_stability(self, authenticated_client, test_data_manager):
|
|
"""测试连接池稳定性"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
unique_id = f"{int(time.time() * 1000)}"
|
|
iterations = 50
|
|
|
|
# 执行多次数据库操作
|
|
for i in range(iterations):
|
|
# 创建用户
|
|
user_data = {
|
|
"username": f"pool_test_{unique_id}_{i}",
|
|
"password": "Test123!@#",
|
|
"email": f"pool_{unique_id}_{i}@example.com",
|
|
"status": 1
|
|
}
|
|
|
|
create_response = await user_api.create_user(user_data)
|
|
if create_response.status_code == 201:
|
|
user_id = create_response.json()["id"]
|
|
test_data_manager.add_user(user_id)
|
|
|
|
# 查询用户
|
|
get_response = await user_api.get_user_by_id(user_id)
|
|
assert get_response.status_code == 200
|
|
|
|
# 更新用户
|
|
update_response = await user_api.update_user(
|
|
user_id,
|
|
{"email": f"updated_{unique_id}_{i}@example.com"}
|
|
)
|
|
assert update_response.status_code == 200
|
|
|
|
# 删除用户
|
|
delete_response = await user_api.delete_user(user_id)
|
|
assert delete_response.status_code == 204
|
|
|
|
test_data_manager._users.remove(user_id)
|
|
|
|
print(f"完成迭代 {i+1}/{iterations}")
|
|
|
|
print(f"连接池稳定性测试完成,共执行 {iterations} 次完整操作")
|
|
```
|
|
|
|
**Step 2: 运行测试验证通过**
|
|
|
|
```bash
|
|
cd api_integration_tests
|
|
pytest tests/test_long_term_stability.py::TestLongTermStability::test_extended_operation_stability -v -s
|
|
```
|
|
|
|
Expected: PASS with stability metrics
|
|
|
|
**Step 3: 提交代码**
|
|
|
|
```bash
|
|
git add api_integration_tests/tests/test_long_term_stability.py
|
|
git commit -m "test: add long term stability tests (P2)"
|
|
```
|
|
|
|
---
|
|
|
|
## 📊 实施总结
|
|
|
|
### 预期成果
|
|
|
|
通过实施本计划,预期达到以下目标:
|
|
|
|
1. **测试覆盖率提升**:
|
|
- API测试覆盖率:13% → 50%
|
|
- 前端测试覆盖率:20% → 40%
|
|
- E2E测试通过率:14% → 80%
|
|
|
|
2. **测试质量提升**:
|
|
- 关键业务流程测试:100%覆盖
|
|
- 异常场景测试:75%覆盖
|
|
- 性能测试:60%覆盖
|
|
- 边界条件测试:70%覆盖
|
|
- 安全测试:85%覆盖
|
|
|
|
3. **测试体系完善**:
|
|
- 建立完整的测试分层体系
|
|
- 实现测试数据管理自动化
|
|
- 建立测试报告和监控机制
|
|
|
|
### 风险和缓解措施
|
|
|
|
1. **测试环境稳定性**:
|
|
- 风险:测试环境不稳定影响测试执行
|
|
- 缓解:建立测试环境监控,及时发现问题
|
|
|
|
2. **测试数据管理**:
|
|
- 风险:测试数据清理不彻底导致污染
|
|
- 缓解:完善测试数据管理工具,自动清理
|
|
|
|
3. **测试执行时间**:
|
|
- 风险:测试用例数量增加导致执行时间过长
|
|
- 缓解:实现测试并行执行,优化测试用例
|
|
|
|
### 下一步行动
|
|
|
|
**计划完成后的下一步**:
|
|
1. 执行本计划中的所有测试用例
|
|
2. 分析测试结果,识别需要修复的问题
|
|
3. 根据测试结果优化系统代码
|
|
4. 建立持续集成测试流水线
|
|
5. 定期回顾和更新测试策略
|
|
|
|
---
|
|
|
|
**计划完成并保存到 `docs/plans/2026-03-26-test-case-supplementation.md`**
|
|
|
|
**执行选项:**
|
|
|
|
**1. Subagent-Driven (this session)** - 我会在当前会话中为每个任务分派新的子代理,任务之间进行代码审查,快速迭代
|
|
|
|
**2. Parallel Session (separate)** - 指导您在工作树中打开新会话,新会话使用 `executing-plans` skill 进行批量执行和检查点
|
|
|
|
**您希望选择哪种执行方式?** |