1e3dc11d59
feat(test-suite): 新增测试套件模块,包含API测试客户端和测试配置 fix(api): 修复数据库实体和仓库的删除操作返回值 style(api): 统一数据库表名和字段命名 perf(api): 添加缓存注解提升配置查询性能 test(api): 添加H2测试数据库配置支持 chore: 清理旧的测试文件和脚本
303 lines
9.4 KiB
Python
303 lines
9.4 KiB
Python
"""
|
|
SQL注入防护测试套件
|
|
|
|
测试范围:
|
|
1. 用户输入SQL注入测试
|
|
2. 查询参数注入测试
|
|
3. 排序字段注入测试
|
|
4. 搜索关键词注入测试
|
|
5. 批量操作注入测试
|
|
|
|
作者: 张翔
|
|
日期: 2026-04-01
|
|
"""
|
|
|
|
import pytest
|
|
from api.auth_api import AuthAPI
|
|
from api.user_api import UserAPI
|
|
from api.role_api import RoleAPI
|
|
from api.menu_api import MenuAPI
|
|
from config.settings import settings
|
|
|
|
|
|
@pytest.mark.security
|
|
@pytest.mark.asyncio
|
|
class TestSQLInjection:
|
|
"""SQL注入防护测试类"""
|
|
|
|
async def test_user_search_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-01: 用户搜索SQL注入测试
|
|
|
|
验证点:
|
|
1. 搜索框无法注入SQL
|
|
2. 特殊字符被正确处理
|
|
3. 查询参数化
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
sql_injection_payloads = [
|
|
"admin' OR '1'='1",
|
|
"admin'; DROP TABLE users; --",
|
|
"admin' UNION SELECT * FROM users --",
|
|
"admin' AND 1=1 --",
|
|
"admin' AND 1=2 --",
|
|
"admin' OR 'x'='x",
|
|
"1; SELECT * FROM users",
|
|
"admin'/*",
|
|
"admin'--",
|
|
"' OR 1=1#",
|
|
"admin' AND SLEEP(5)--",
|
|
"admin'; WAITFOR DELAY '0:0:5'; --"
|
|
]
|
|
|
|
for payload in sql_injection_payloads:
|
|
response = await user_api.get_users_by_page(
|
|
page=0,
|
|
size=10,
|
|
username=payload
|
|
)
|
|
|
|
assert response.status_code in [200, 400], \
|
|
f"SQL注入payload '{payload}' 导致异常响应"
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
assert "content" in data or "users" in data, \
|
|
f"响应格式异常,payload: {payload}"
|
|
|
|
async def test_user_create_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-02: 用户创建SQL注入测试
|
|
|
|
验证点:
|
|
1. 用户名字段防注入
|
|
2. 邮箱字段防注入
|
|
3. 电话字段防注入
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
malicious_user_data = {
|
|
"username": "test'; DROP TABLE users; --",
|
|
"password": "Test123!@#",
|
|
"email": "test@example.com'; DROP TABLE users; --",
|
|
"phone": "13800138000'; DROP TABLE users; --",
|
|
"nickname": "测试用户",
|
|
"status": 1
|
|
}
|
|
|
|
response = await user_api.create_user(malicious_user_data)
|
|
|
|
if response.status_code in [201, 200]:
|
|
user_id = response.json().get("id")
|
|
if user_id:
|
|
await user_api.delete_user(user_id)
|
|
|
|
users_response = await user_api.get_users_by_page()
|
|
assert users_response.status_code == 200, "用户表应该仍然存在"
|
|
else:
|
|
assert response.status_code in [400, 422], \
|
|
"恶意数据应被拒绝或清洗"
|
|
|
|
async def test_role_search_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-03: 角色搜索SQL注入测试
|
|
|
|
验证点:
|
|
1. 角色名搜索防注入
|
|
2. 角色键搜索防注入
|
|
"""
|
|
role_api = RoleAPI(authenticated_client)
|
|
|
|
injection_payloads = [
|
|
"admin' OR '1'='1",
|
|
"admin'; DELETE FROM roles WHERE '1'='1",
|
|
"admin' UNION SELECT * FROM roles --"
|
|
]
|
|
|
|
for payload in injection_payloads:
|
|
response = await role_api.get_roles_by_page(
|
|
page=0,
|
|
size=10,
|
|
roleName=payload
|
|
)
|
|
|
|
assert response.status_code in [200, 400], \
|
|
f"SQL注入payload '{payload}' 导致异常"
|
|
|
|
async def test_menu_search_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-04: 菜单搜索SQL注入测试
|
|
|
|
验证点:
|
|
1. 菜单名搜索防注入
|
|
2. 菜单路径搜索防注入
|
|
"""
|
|
menu_api = MenuAPI(authenticated_client)
|
|
|
|
injection_payloads = [
|
|
"系统管理' OR '1'='1",
|
|
"系统管理'; DROP TABLE menus; --",
|
|
"/system' UNION SELECT * FROM menus --"
|
|
]
|
|
|
|
for payload in injection_payloads:
|
|
response = await menu_api.get_menus(
|
|
menuName=payload
|
|
)
|
|
|
|
assert response.status_code in [200, 400], \
|
|
f"SQL注入payload '{payload}' 导致异常"
|
|
|
|
async def test_order_by_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-05: 排序字段SQL注入测试
|
|
|
|
验证点:
|
|
1. 排序字段防注入
|
|
2. 排序方向防注入
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
malicious_sort_fields = [
|
|
"id; DROP TABLE users",
|
|
"id; SELECT * FROM users",
|
|
"id UNION SELECT * FROM users",
|
|
"(SELECT CASE WHEN 1=1 THEN id ELSE username END)",
|
|
"id; INSERT INTO users VALUES (999, 'hacker', 'hacked')"
|
|
]
|
|
|
|
for sort_field in malicious_sort_fields:
|
|
response = await user_api.get_users_by_page(
|
|
page=0,
|
|
size=10,
|
|
sortBy=sort_field,
|
|
sortOrder="asc"
|
|
)
|
|
|
|
assert response.status_code in [200, 400], \
|
|
f"排序注入 '{sort_field}' 导致异常"
|
|
|
|
async def test_batch_delete_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-06: 批量删除SQL注入测试
|
|
|
|
验证点:
|
|
1. 批量删除ID列表防注入
|
|
2. 删除操作参数化
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
malicious_ids = [
|
|
"1,2,3; DROP TABLE users",
|
|
"1 OR 1=1",
|
|
"1; DELETE FROM users WHERE 1=1"
|
|
]
|
|
|
|
for ids in malicious_ids:
|
|
try:
|
|
response = await user_api.batch_delete_users(ids)
|
|
|
|
assert response.status_code in [400, 404, 422], \
|
|
f"批量删除注入 '{ids}' 应被拒绝"
|
|
except Exception:
|
|
pass
|
|
|
|
async def test_filter_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-07: 过滤条件SQL注入测试
|
|
|
|
验证点:
|
|
1. 过滤参数防注入
|
|
2. 复杂查询条件安全
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
injection_filters = {
|
|
"status": "1 OR 1=1",
|
|
"email": "test@example.com' OR '1'='1",
|
|
"phone": "13800138000' OR '1'='1"
|
|
}
|
|
|
|
for field, value in injection_filters.items():
|
|
response = await user_api.get_users_by_page(
|
|
page=0,
|
|
size=10,
|
|
**{field: value}
|
|
)
|
|
|
|
assert response.status_code in [200, 400], \
|
|
f"过滤注入 '{field}={value}' 导致异常"
|
|
|
|
async def test_time_based_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-08: 时间盲注测试
|
|
|
|
验证点:
|
|
1. SLEEP函数被过滤
|
|
2. WAITFOR命令被过滤
|
|
3. 时间盲注无效
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
time_based_payloads = [
|
|
"admin' AND SLEEP(5)--",
|
|
"admin' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
|
|
"admin'; WAITFOR DELAY '0:0:5'; --",
|
|
"admin' AND BENCHMARK(5000000,SHA1('test'))--"
|
|
]
|
|
|
|
import time
|
|
|
|
for payload in time_based_payloads:
|
|
start_time = time.time()
|
|
|
|
response = await user_api.get_users_by_page(
|
|
page=0,
|
|
size=10,
|
|
username=payload
|
|
)
|
|
|
|
elapsed_time = time.time() - start_time
|
|
|
|
assert elapsed_time < 6, \
|
|
f"时间盲注 '{payload}' 可能成功,响应时间: {elapsed_time}秒"
|
|
|
|
assert response.status_code in [200, 400]
|
|
|
|
async def test_union_based_sql_injection(self, authenticated_client):
|
|
"""
|
|
SEC-SQL-09: UNION注入测试
|
|
|
|
验证点:
|
|
1. UNION SELECT被阻止
|
|
2. 列数探测无效
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
union_payloads = [
|
|
"admin' UNION SELECT NULL--",
|
|
"admin' UNION SELECT NULL,NULL--",
|
|
"admin' UNION SELECT NULL,NULL,NULL--",
|
|
"admin' UNION SELECT username,password,email FROM users--",
|
|
"admin' UNION ALL SELECT 1,2,3,4,5,6,7,8,9,10--"
|
|
]
|
|
|
|
for payload in union_payloads:
|
|
response = await user_api.get_users_by_page(
|
|
page=0,
|
|
size=10,
|
|
username=payload
|
|
)
|
|
|
|
assert response.status_code in [200, 400], \
|
|
f"UNION注入 '{payload}' 导致异常"
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
content = data.get("content", [])
|
|
|
|
for item in content:
|
|
assert isinstance(item, dict), \
|
|
f"UNION注入可能成功,返回了非预期数据: {item}"
|