263 lines
8.9 KiB
Python
263 lines
8.9 KiB
Python
"""
|
|
JWT安全测试套件
|
|
|
|
测试范围:
|
|
1. JWT Token生成与验证
|
|
2. Token过期处理
|
|
3. Token签名验证
|
|
4. Token刷新机制
|
|
5. 密钥安全性验证
|
|
|
|
作者: 张翔
|
|
日期: 2026-04-01
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from api.auth_api import AuthAPI
|
|
from api.user_api import UserAPI
|
|
from config.settings import settings
|
|
|
|
|
|
@pytest.mark.security
|
|
@pytest.mark.asyncio
|
|
class TestJWTSecurity:
|
|
"""JWT安全测试类"""
|
|
|
|
async def test_jwt_token_structure(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-01: JWT Token结构验证
|
|
|
|
验证点:
|
|
1. Token包含正确的Header
|
|
2. Token包含正确的Payload
|
|
3. Token使用正确的签名算法
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
assert login_response.status_code == 200
|
|
|
|
token = login_response.json().get("token")
|
|
assert token is not None, "未获取到Token"
|
|
|
|
decoded = jwt.decode(token, options={"verify_signature": False})
|
|
|
|
assert "sub" in decoded, "Token缺少subject声明"
|
|
assert "exp" in decoded, "Token缺少过期时间"
|
|
assert "iat" in decoded, "Token缺少签发时间"
|
|
assert "userId" in decoded or "user_id" in decoded, "Token缺少用户ID"
|
|
|
|
async def test_jwt_token_expiration(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-02: JWT Token过期验证
|
|
|
|
验证点:
|
|
1. Token有过期时间
|
|
2. 过期时间在合理范围内
|
|
3. 过期Token无法使用
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
token = login_response.json().get("token")
|
|
|
|
decoded = jwt.decode(token, options={"verify_signature": False})
|
|
|
|
exp_time = datetime.fromtimestamp(decoded["exp"])
|
|
iat_time = datetime.fromtimestamp(decoded["iat"])
|
|
|
|
time_diff = (exp_time - iat_time).total_seconds()
|
|
|
|
assert time_diff > 0, "Token过期时间无效"
|
|
assert time_diff <= 86400, "Token有效期不应超过24小时"
|
|
|
|
async def test_jwt_signature_verification(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-03: JWT签名验证
|
|
|
|
验证点:
|
|
1. 篡改的Token被拒绝
|
|
2. 无效签名的Token被拒绝
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
valid_token = login_response.json().get("token")
|
|
|
|
tampered_token = valid_token[:-5] + "XXXXX"
|
|
|
|
client_with_tampered = authenticated_client.__class__(
|
|
base_url=settings.API_BASE_URL,
|
|
headers={"Authorization": f"Bearer {tampered_token}"}
|
|
)
|
|
|
|
user_api_tampered = UserAPI(client_with_tampered)
|
|
response = await user_api_tampered.get_users_by_page()
|
|
|
|
assert response.status_code in [401, 403], "篡改的Token应该被拒绝"
|
|
|
|
async def test_jwt_algorithm_security(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-04: JWT算法安全验证
|
|
|
|
验证点:
|
|
1. 不允许使用none算法
|
|
2. 不允许算法混淆攻击
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
token = login_response.json().get("token")
|
|
|
|
header = jwt.get_unverified_header(token)
|
|
|
|
assert header["alg"] != "none", "不应允许none算法"
|
|
assert header["alg"] in ["HS256", "HS384", "HS512", "RS256", "RS384", "RS512"], \
|
|
"应使用安全的签名算法"
|
|
|
|
async def test_jwt_claims_validation(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-05: JWT声明验证
|
|
|
|
验证点:
|
|
1. 必要的声明存在
|
|
2. 声明值有效
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
token = login_response.json().get("token")
|
|
|
|
decoded = jwt.decode(token, options={"verify_signature": False})
|
|
|
|
required_claims = ["sub", "exp", "iat"]
|
|
for claim in required_claims:
|
|
assert claim in decoded, f"Token缺少必要声明: {claim}"
|
|
|
|
assert decoded["sub"] == "admin", "Subject应该是用户名"
|
|
assert decoded["exp"] > time.time(), "Token不应已过期"
|
|
|
|
async def test_jwt_token_in_validation(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-06: 无效Token验证
|
|
|
|
验证点:
|
|
1. 空Token被拒绝
|
|
2. 格式错误的Token被拒绝
|
|
3. 过期Token被拒绝
|
|
"""
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
invalid_tokens = [
|
|
"",
|
|
"invalid.token.format",
|
|
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.invalid",
|
|
None
|
|
]
|
|
|
|
for invalid_token in invalid_tokens:
|
|
if invalid_token is None:
|
|
continue
|
|
|
|
client_with_invalid = authenticated_client.__class__(
|
|
base_url=settings.API_BASE_URL,
|
|
headers={"Authorization": f"Bearer {invalid_token}"}
|
|
)
|
|
|
|
user_api_invalid = UserAPI(client_with_invalid)
|
|
response = await user_api_invalid.get_users_by_page()
|
|
|
|
assert response.status_code in [401, 403, 400], \
|
|
f"无效Token '{invalid_token}' 应该被拒绝"
|
|
|
|
async def test_jwt_token_refresh_mechanism(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-07: Token刷新机制验证
|
|
|
|
验证点:
|
|
1. 支持Token刷新
|
|
2. 刷新后生成新Token
|
|
3. 旧Token失效或共存
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
original_token = login_response.json().get("token")
|
|
|
|
try:
|
|
refresh_response = await auth_api.refresh_token(original_token)
|
|
|
|
if refresh_response.status_code == 200:
|
|
new_token = refresh_response.json().get("token")
|
|
assert new_token is not None, "刷新应返回新Token"
|
|
assert new_token != original_token, "新Token应不同于原Token"
|
|
else:
|
|
pytest.skip("系统未实现Token刷新机制")
|
|
except Exception as e:
|
|
pytest.skip(f"Token刷新机制测试跳过: {str(e)}")
|
|
|
|
async def test_jwt_key_strength(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-08: JWT密钥强度验证
|
|
|
|
验证点:
|
|
1. 密钥长度足够
|
|
2. 密钥不是弱密钥
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
token = login_response.json().get("token")
|
|
|
|
header = jwt.get_unverified_header(token)
|
|
|
|
if header["alg"].startswith("HS"):
|
|
weak_secrets = [
|
|
"secret", "password", "123456", "admin",
|
|
"your-256-bit-secret", "your-secret-key"
|
|
]
|
|
|
|
for weak_secret in weak_secrets:
|
|
try:
|
|
jwt.decode(token, weak_secret, algorithms=[header["alg"]])
|
|
pytest.fail(f"使用了弱密钥: {weak_secret}")
|
|
except jwt.InvalidSignatureError:
|
|
pass
|
|
|
|
async def test_jwt_user_impersonation_prevention(self, authenticated_client):
|
|
"""
|
|
SEC-JWT-09: 用户伪装防护验证
|
|
|
|
验证点:
|
|
1. 无法通过修改Token伪装其他用户
|
|
2. 用户ID与Token绑定
|
|
"""
|
|
auth_api = AuthAPI(authenticated_client)
|
|
user_api = UserAPI(authenticated_client)
|
|
|
|
login_response = await auth_api.login("admin", "admin123")
|
|
token = login_response.json().get("token")
|
|
|
|
decoded = jwt.decode(token, options={"verify_signature": False})
|
|
|
|
users_response = await user_api.get_users_by_page()
|
|
assert users_response.status_code == 200
|
|
|
|
users = users_response.json().get("content", [])
|
|
other_user = next((u for u in users if u.get("username") != "admin"), None)
|
|
|
|
if other_user:
|
|
client_with_admin_token = authenticated_client.__class__(
|
|
base_url=settings.API_BASE_URL,
|
|
headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
|
|
user_api_admin = UserAPI(client_with_admin_token)
|
|
response = await user_api_admin.get_user_by_id(other_user["id"])
|
|
|
|
assert response.status_code in [200, 403], "应正确处理跨用户访问"
|