Files
novalon-manage-system/test-suite/tests/security/test_jwt_security.py
T
张翔 1e3dc11d59 refactor(test): 重构测试套件结构并优化测试配置
feat(test-suite): 新增测试套件模块,包含API测试客户端和测试配置
fix(api): 修复数据库实体和仓库的删除操作返回值
style(api): 统一数据库表名和字段命名
perf(api): 添加缓存注解提升配置查询性能
test(api): 添加H2测试数据库配置支持
chore: 清理旧的测试文件和脚本
2026-04-01 20:57:24 +08:00

263 lines
8.9 KiB
Python

"""
JWT安全测试套件
测试范围:
1. JWT Token生成与验证
2. Token过期处理
3. Token签名验证
4. Token刷新机制
5. 密钥安全性验证
作者: 张翔
日期: 2026-04-01
"""
import pytest
import time
import jwt
from datetime import datetime, timedelta
from api.auth_api import AuthAPI
from api.user_api import UserAPI
from config.settings import settings
@pytest.mark.security
@pytest.mark.asyncio
class TestJWTSecurity:
"""JWT安全测试类"""
async def test_jwt_token_structure(self, authenticated_client):
"""
SEC-JWT-01: JWT Token结构验证
验证点:
1. Token包含正确的Header
2. Token包含正确的Payload
3. Token使用正确的签名算法
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
assert login_response.status_code == 200
token = login_response.json().get("token")
assert token is not None, "未获取到Token"
decoded = jwt.decode(token, options={"verify_signature": False})
assert "sub" in decoded, "Token缺少subject声明"
assert "exp" in decoded, "Token缺少过期时间"
assert "iat" in decoded, "Token缺少签发时间"
assert "userId" in decoded or "user_id" in decoded, "Token缺少用户ID"
async def test_jwt_token_expiration(self, authenticated_client):
"""
SEC-JWT-02: JWT Token过期验证
验证点:
1. Token有过期时间
2. 过期时间在合理范围内
3. 过期Token无法使用
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
decoded = jwt.decode(token, options={"verify_signature": False})
exp_time = datetime.fromtimestamp(decoded["exp"])
iat_time = datetime.fromtimestamp(decoded["iat"])
time_diff = (exp_time - iat_time).total_seconds()
assert time_diff > 0, "Token过期时间无效"
assert time_diff <= 86400, "Token有效期不应超过24小时"
async def test_jwt_signature_verification(self, authenticated_client):
"""
SEC-JWT-03: JWT签名验证
验证点:
1. 篡改的Token被拒绝
2. 无效签名的Token被拒绝
"""
auth_api = AuthAPI(authenticated_client)
user_api = UserAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
valid_token = login_response.json().get("token")
tampered_token = valid_token[:-5] + "XXXXX"
client_with_tampered = authenticated_client.__class__(
base_url=settings.API_BASE_URL,
headers={"Authorization": f"Bearer {tampered_token}"}
)
user_api_tampered = UserAPI(client_with_tampered)
response = await user_api_tampered.get_users_by_page()
assert response.status_code in [401, 403], "篡改的Token应该被拒绝"
async def test_jwt_algorithm_security(self, authenticated_client):
"""
SEC-JWT-04: JWT算法安全验证
验证点:
1. 不允许使用none算法
2. 不允许算法混淆攻击
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
header = jwt.get_unverified_header(token)
assert header["alg"] != "none", "不应允许none算法"
assert header["alg"] in ["HS256", "HS384", "HS512", "RS256", "RS384", "RS512"], \
"应使用安全的签名算法"
async def test_jwt_claims_validation(self, authenticated_client):
"""
SEC-JWT-05: JWT声明验证
验证点:
1. 必要的声明存在
2. 声明值有效
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
decoded = jwt.decode(token, options={"verify_signature": False})
required_claims = ["sub", "exp", "iat"]
for claim in required_claims:
assert claim in decoded, f"Token缺少必要声明: {claim}"
assert decoded["sub"] == "admin", "Subject应该是用户名"
assert decoded["exp"] > time.time(), "Token不应已过期"
async def test_jwt_token_in_validation(self, authenticated_client):
"""
SEC-JWT-06: 无效Token验证
验证点:
1. 空Token被拒绝
2. 格式错误的Token被拒绝
3. 过期Token被拒绝
"""
user_api = UserAPI(authenticated_client)
invalid_tokens = [
"",
"invalid.token.format",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.invalid",
None
]
for invalid_token in invalid_tokens:
if invalid_token is None:
continue
client_with_invalid = authenticated_client.__class__(
base_url=settings.API_BASE_URL,
headers={"Authorization": f"Bearer {invalid_token}"}
)
user_api_invalid = UserAPI(client_with_invalid)
response = await user_api_invalid.get_users_by_page()
assert response.status_code in [401, 403, 400], \
f"无效Token '{invalid_token}' 应该被拒绝"
async def test_jwt_token_refresh_mechanism(self, authenticated_client):
"""
SEC-JWT-07: Token刷新机制验证
验证点:
1. 支持Token刷新
2. 刷新后生成新Token
3. 旧Token失效或共存
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
original_token = login_response.json().get("token")
try:
refresh_response = await auth_api.refresh_token(original_token)
if refresh_response.status_code == 200:
new_token = refresh_response.json().get("token")
assert new_token is not None, "刷新应返回新Token"
assert new_token != original_token, "新Token应不同于原Token"
else:
pytest.skip("系统未实现Token刷新机制")
except Exception as e:
pytest.skip(f"Token刷新机制测试跳过: {str(e)}")
async def test_jwt_key_strength(self, authenticated_client):
"""
SEC-JWT-08: JWT密钥强度验证
验证点:
1. 密钥长度足够
2. 密钥不是弱密钥
"""
auth_api = AuthAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
header = jwt.get_unverified_header(token)
if header["alg"].startswith("HS"):
weak_secrets = [
"secret", "password", "123456", "admin",
"your-256-bit-secret", "your-secret-key"
]
for weak_secret in weak_secrets:
try:
jwt.decode(token, weak_secret, algorithms=[header["alg"]])
pytest.fail(f"使用了弱密钥: {weak_secret}")
except jwt.InvalidSignatureError:
pass
async def test_jwt_user_impersonation_prevention(self, authenticated_client):
"""
SEC-JWT-09: 用户伪装防护验证
验证点:
1. 无法通过修改Token伪装其他用户
2. 用户ID与Token绑定
"""
auth_api = AuthAPI(authenticated_client)
user_api = UserAPI(authenticated_client)
login_response = await auth_api.login("admin", "admin123")
token = login_response.json().get("token")
decoded = jwt.decode(token, options={"verify_signature": False})
users_response = await user_api.get_users_by_page()
assert users_response.status_code == 200
users = users_response.json().get("content", [])
other_user = next((u for u in users if u.get("username") != "admin"), None)
if other_user:
client_with_admin_token = authenticated_client.__class__(
base_url=settings.API_BASE_URL,
headers={"Authorization": f"Bearer {token}"}
)
user_api_admin = UserAPI(client_with_admin_token)
response = await user_api_admin.get_user_by_id(other_user["id"])
assert response.status_code in [200, 403], "应正确处理跨用户访问"