Files
张翔 1e3dc11d59 refactor(test): 重构测试套件结构并优化测试配置
feat(test-suite): 新增测试套件模块,包含API测试客户端和测试配置
fix(api): 修复数据库实体和仓库的删除操作返回值
style(api): 统一数据库表名和字段命名
perf(api): 添加缓存注解提升配置查询性能
test(api): 添加H2测试数据库配置支持
chore: 清理旧的测试文件和脚本
2026-04-01 20:57:24 +08:00

303 lines
9.4 KiB
Python

"""
SQL注入防护测试套件
测试范围:
1. 用户输入SQL注入测试
2. 查询参数注入测试
3. 排序字段注入测试
4. 搜索关键词注入测试
5. 批量操作注入测试
作者: 张翔
日期: 2026-04-01
"""
import pytest
from api.auth_api import AuthAPI
from api.user_api import UserAPI
from api.role_api import RoleAPI
from api.menu_api import MenuAPI
from config.settings import settings
@pytest.mark.security
@pytest.mark.asyncio
class TestSQLInjection:
"""SQL注入防护测试类"""
async def test_user_search_sql_injection(self, authenticated_client):
"""
SEC-SQL-01: 用户搜索SQL注入测试
验证点:
1. 搜索框无法注入SQL
2. 特殊字符被正确处理
3. 查询参数化
"""
user_api = UserAPI(authenticated_client)
sql_injection_payloads = [
"admin' OR '1'='1",
"admin'; DROP TABLE users; --",
"admin' UNION SELECT * FROM users --",
"admin' AND 1=1 --",
"admin' AND 1=2 --",
"admin' OR 'x'='x",
"1; SELECT * FROM users",
"admin'/*",
"admin'--",
"' OR 1=1#",
"admin' AND SLEEP(5)--",
"admin'; WAITFOR DELAY '0:0:5'; --"
]
for payload in sql_injection_payloads:
response = await user_api.get_users_by_page(
page=0,
size=10,
username=payload
)
assert response.status_code in [200, 400], \
f"SQL注入payload '{payload}' 导致异常响应"
if response.status_code == 200:
data = response.json()
assert "content" in data or "users" in data, \
f"响应格式异常,payload: {payload}"
async def test_user_create_sql_injection(self, authenticated_client):
"""
SEC-SQL-02: 用户创建SQL注入测试
验证点:
1. 用户名字段防注入
2. 邮箱字段防注入
3. 电话字段防注入
"""
user_api = UserAPI(authenticated_client)
malicious_user_data = {
"username": "test'; DROP TABLE users; --",
"password": "Test123!@#",
"email": "test@example.com'; DROP TABLE users; --",
"phone": "13800138000'; DROP TABLE users; --",
"nickname": "测试用户",
"status": 1
}
response = await user_api.create_user(malicious_user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
if user_id:
await user_api.delete_user(user_id)
users_response = await user_api.get_users_by_page()
assert users_response.status_code == 200, "用户表应该仍然存在"
else:
assert response.status_code in [400, 422], \
"恶意数据应被拒绝或清洗"
async def test_role_search_sql_injection(self, authenticated_client):
"""
SEC-SQL-03: 角色搜索SQL注入测试
验证点:
1. 角色名搜索防注入
2. 角色键搜索防注入
"""
role_api = RoleAPI(authenticated_client)
injection_payloads = [
"admin' OR '1'='1",
"admin'; DELETE FROM roles WHERE '1'='1",
"admin' UNION SELECT * FROM roles --"
]
for payload in injection_payloads:
response = await role_api.get_roles_by_page(
page=0,
size=10,
roleName=payload
)
assert response.status_code in [200, 400], \
f"SQL注入payload '{payload}' 导致异常"
async def test_menu_search_sql_injection(self, authenticated_client):
"""
SEC-SQL-04: 菜单搜索SQL注入测试
验证点:
1. 菜单名搜索防注入
2. 菜单路径搜索防注入
"""
menu_api = MenuAPI(authenticated_client)
injection_payloads = [
"系统管理' OR '1'='1",
"系统管理'; DROP TABLE menus; --",
"/system' UNION SELECT * FROM menus --"
]
for payload in injection_payloads:
response = await menu_api.get_menus(
menuName=payload
)
assert response.status_code in [200, 400], \
f"SQL注入payload '{payload}' 导致异常"
async def test_order_by_sql_injection(self, authenticated_client):
"""
SEC-SQL-05: 排序字段SQL注入测试
验证点:
1. 排序字段防注入
2. 排序方向防注入
"""
user_api = UserAPI(authenticated_client)
malicious_sort_fields = [
"id; DROP TABLE users",
"id; SELECT * FROM users",
"id UNION SELECT * FROM users",
"(SELECT CASE WHEN 1=1 THEN id ELSE username END)",
"id; INSERT INTO users VALUES (999, 'hacker', 'hacked')"
]
for sort_field in malicious_sort_fields:
response = await user_api.get_users_by_page(
page=0,
size=10,
sortBy=sort_field,
sortOrder="asc"
)
assert response.status_code in [200, 400], \
f"排序注入 '{sort_field}' 导致异常"
async def test_batch_delete_sql_injection(self, authenticated_client):
"""
SEC-SQL-06: 批量删除SQL注入测试
验证点:
1. 批量删除ID列表防注入
2. 删除操作参数化
"""
user_api = UserAPI(authenticated_client)
malicious_ids = [
"1,2,3; DROP TABLE users",
"1 OR 1=1",
"1; DELETE FROM users WHERE 1=1"
]
for ids in malicious_ids:
try:
response = await user_api.batch_delete_users(ids)
assert response.status_code in [400, 404, 422], \
f"批量删除注入 '{ids}' 应被拒绝"
except Exception:
pass
async def test_filter_sql_injection(self, authenticated_client):
"""
SEC-SQL-07: 过滤条件SQL注入测试
验证点:
1. 过滤参数防注入
2. 复杂查询条件安全
"""
user_api = UserAPI(authenticated_client)
injection_filters = {
"status": "1 OR 1=1",
"email": "test@example.com' OR '1'='1",
"phone": "13800138000' OR '1'='1"
}
for field, value in injection_filters.items():
response = await user_api.get_users_by_page(
page=0,
size=10,
**{field: value}
)
assert response.status_code in [200, 400], \
f"过滤注入 '{field}={value}' 导致异常"
async def test_time_based_sql_injection(self, authenticated_client):
"""
SEC-SQL-08: 时间盲注测试
验证点:
1. SLEEP函数被过滤
2. WAITFOR命令被过滤
3. 时间盲注无效
"""
user_api = UserAPI(authenticated_client)
time_based_payloads = [
"admin' AND SLEEP(5)--",
"admin' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
"admin'; WAITFOR DELAY '0:0:5'; --",
"admin' AND BENCHMARK(5000000,SHA1('test'))--"
]
import time
for payload in time_based_payloads:
start_time = time.time()
response = await user_api.get_users_by_page(
page=0,
size=10,
username=payload
)
elapsed_time = time.time() - start_time
assert elapsed_time < 6, \
f"时间盲注 '{payload}' 可能成功,响应时间: {elapsed_time}"
assert response.status_code in [200, 400]
async def test_union_based_sql_injection(self, authenticated_client):
"""
SEC-SQL-09: UNION注入测试
验证点:
1. UNION SELECT被阻止
2. 列数探测无效
"""
user_api = UserAPI(authenticated_client)
union_payloads = [
"admin' UNION SELECT NULL--",
"admin' UNION SELECT NULL,NULL--",
"admin' UNION SELECT NULL,NULL,NULL--",
"admin' UNION SELECT username,password,email FROM users--",
"admin' UNION ALL SELECT 1,2,3,4,5,6,7,8,9,10--"
]
for payload in union_payloads:
response = await user_api.get_users_by_page(
page=0,
size=10,
username=payload
)
assert response.status_code in [200, 400], \
f"UNION注入 '{payload}' 导致异常"
if response.status_code == 200:
data = response.json()
content = data.get("content", [])
for item in content:
assert isinstance(item, dict), \
f"UNION注入可能成功,返回了非预期数据: {item}"