e4721053bd
- 新增异常处理体系(BaseException及其子类) - 优化密码、邮箱、用户名等基础类型 - 添加字典管理、登录日志、操作日志的E2E测试 - 完善API集成测试和安全测试 - 添加性能测试配置和脚本 - 优化OpenAPI配置和全局异常处理器
389 lines
13 KiB
Python
389 lines
13 KiB
Python
"""
|
|
安全测试套件
|
|
|
|
测试内容:
|
|
1. SQL注入测试
|
|
2. XSS攻击测试
|
|
3. CSRF保护测试
|
|
4. 认证授权测试
|
|
5. 输入验证测试
|
|
"""
|
|
|
|
import pytest
|
|
import httpx
|
|
from typing import Dict, Any
|
|
|
|
|
|
class SecurityTestBase:
|
|
"""安全测试基类"""
|
|
|
|
def __init__(self, base_url: str = "http://localhost:8084"):
|
|
self.base_url = base_url
|
|
self.client = httpx.Client(timeout=30.0)
|
|
self.token = None
|
|
|
|
def login(self, username: str = "admin", password: str = "admin123") -> str:
|
|
"""登录获取token"""
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/auth/login",
|
|
json={"username": username, "password": password}
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
return data.get("token")
|
|
|
|
def setup_auth(self):
|
|
"""设置认证token"""
|
|
if not self.token:
|
|
self.token = self.login()
|
|
|
|
def get_headers(self) -> Dict[str, str]:
|
|
"""获取请求头"""
|
|
headers = {"Content-Type": "application/json"}
|
|
if self.token:
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
return headers
|
|
|
|
def cleanup(self):
|
|
"""清理资源"""
|
|
self.client.close()
|
|
|
|
|
|
class TestSQLInjection(SecurityTestBase):
|
|
"""SQL注入测试"""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self):
|
|
self.setup_auth()
|
|
yield
|
|
self.cleanup()
|
|
|
|
def test_sql_injection_in_login(self):
|
|
"""测试登录接口的SQL注入防护"""
|
|
malicious_inputs = [
|
|
"admin' OR '1'='1",
|
|
"admin' --",
|
|
"admin' #",
|
|
"admin'/*",
|
|
"admin' or 1=1--",
|
|
"admin' union select * from users--",
|
|
]
|
|
|
|
for payload in malicious_inputs:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/auth/login",
|
|
json={"username": payload, "password": "password"}
|
|
)
|
|
|
|
# 应该返回401(认证失败),而不是绕过认证
|
|
assert response.status_code == 401, f"SQL注入攻击未阻止: {payload}"
|
|
|
|
def test_sql_injection_in_user_search(self):
|
|
"""测试用户搜索接口的SQL注入防护"""
|
|
self.setup_auth()
|
|
malicious_inputs = [
|
|
"test' OR '1'='1",
|
|
"test' UNION SELECT * FROM users--",
|
|
"test'; DROP TABLE users--",
|
|
"1' OR 1=1--",
|
|
]
|
|
|
|
for payload in malicious_inputs:
|
|
response = self.client.get(
|
|
f"{self.base_url}/api/users",
|
|
params={"username": payload},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回400(错误请求)或正常结果,但不应该暴露数据库错误
|
|
assert response.status_code in [200, 400], f"SQL注入攻击未正确处理: {payload}"
|
|
|
|
# 如果返回200,验证结果不包含所有用户数据
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if "content" in data:
|
|
# 不应该返回所有用户数据
|
|
assert len(data["content"]) < 100, f"SQL注入可能成功: {payload}"
|
|
|
|
def test_sql_injection_in_user_creation(self):
|
|
"""测试用户创建接口的SQL注入防护"""
|
|
self.setup_auth()
|
|
malicious_inputs = [
|
|
{"username": "test' OR '1'='1", "password": "password"},
|
|
{"username": "test'; DROP TABLE users--", "password": "password"},
|
|
{"username": "test' UNION SELECT * FROM users--", "password": "password"},
|
|
]
|
|
|
|
for payload in malicious_inputs:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/users",
|
|
json=payload,
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回400(错误请求)或409(冲突),不应该创建用户
|
|
assert response.status_code in [400, 409], f"SQL注入攻击未阻止: {payload}"
|
|
|
|
|
|
class TestXSS(SecurityTestBase):
|
|
"""XSS攻击测试"""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self):
|
|
self.setup_auth()
|
|
yield
|
|
self.cleanup()
|
|
|
|
def test_xss_in_user_creation(self):
|
|
"""测试用户创建接口的XSS防护"""
|
|
xss_payloads = [
|
|
"<script>alert('XSS')</script>",
|
|
"<img src=x onerror=alert('XSS')>",
|
|
"<svg onload=alert('XSS')>",
|
|
"<body onload=alert('XSS')>",
|
|
"<input onfocus=alert('XSS') autofocus>",
|
|
"<select onfocus=alert('XSS') autofocus>",
|
|
"<textarea onfocus=alert('XSS') autofocus>",
|
|
"<keygen onfocus=alert('XSS') autofocus>",
|
|
"<video><source onerror=alert('XSS')>",
|
|
]
|
|
|
|
for payload in xss_payloads:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/users",
|
|
json={
|
|
"username": f"test_{hash(payload)}",
|
|
"password": "password123",
|
|
"nickname": payload,
|
|
"email": f"test_{hash(payload)}@example.com"
|
|
},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回201(创建成功)或400(错误请求)
|
|
assert response.status_code in [201, 400], f"XSS攻击未正确处理: {payload}"
|
|
|
|
# 如果创建成功,验证XSS被转义
|
|
if response.status_code == 201:
|
|
user_id = response.json().get("id")
|
|
get_response = self.client.get(
|
|
f"{self.base_url}/api/users/{user_id}",
|
|
headers=self.get_headers()
|
|
)
|
|
user_data = get_response.json()
|
|
|
|
# 验证XSS代码被转义
|
|
assert "<script>" not in user_data.get("nickname", ""), f"XSS未转义: {payload}"
|
|
assert "onerror=" not in user_data.get("nickname", ""), f"XSS未转义: {payload}"
|
|
assert "onload=" not in user_data.get("nickname", ""), f"XSS未转义: {payload}"
|
|
|
|
def test_xss_in_role_creation(self):
|
|
"""测试角色创建接口的XSS防护"""
|
|
self.setup_auth()
|
|
xss_payloads = [
|
|
"<script>alert('XSS')</script>",
|
|
"<img src=x onerror=alert('XSS')>",
|
|
]
|
|
|
|
for payload in xss_payloads:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/roles",
|
|
json={
|
|
"name": payload,
|
|
"code": f"TEST_{hash(payload)}",
|
|
"description": payload
|
|
},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回201(创建成功)或400(错误请求)
|
|
assert response.status_code in [201, 400], f"XSS攻击未正确处理: {payload}"
|
|
|
|
def test_xss_in_notice_creation(self):
|
|
"""测试通知创建接口的XSS防护"""
|
|
self.setup_auth()
|
|
xss_payloads = [
|
|
"<script>alert('XSS')</script>",
|
|
"<img src=x onerror=alert('XSS')>",
|
|
]
|
|
|
|
for payload in xss_payloads:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/notices",
|
|
json={
|
|
"title": payload,
|
|
"content": payload,
|
|
"type": "1",
|
|
"status": "0"
|
|
},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回201(创建成功)或400(错误请求)
|
|
assert response.status_code in [201, 400], f"XSS攻击未正确处理: {payload}"
|
|
|
|
|
|
class TestCSRF(SecurityTestBase):
|
|
"""CSRF保护测试"""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self):
|
|
self.setup_auth()
|
|
yield
|
|
self.cleanup()
|
|
|
|
def test_csrf_protection_in_state_changing_requests(self):
|
|
"""测试状态改变请求的CSRF保护"""
|
|
self.setup_auth()
|
|
|
|
# 尝试不使用CSRF token创建用户
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/users",
|
|
json={
|
|
"username": "test_csrf",
|
|
"password": "password123",
|
|
"nickname": "CSRF Test",
|
|
"email": "csrf_test@example.com"
|
|
},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 如果系统有CSRF保护,应该返回403(禁止)
|
|
# 如果没有CSRF保护,可能返回201(创建成功)
|
|
# 这里我们只验证请求能够正常处理
|
|
assert response.status_code in [201, 403, 400]
|
|
|
|
|
|
class TestAuthentication(SecurityTestBase):
|
|
"""认证授权测试"""
|
|
|
|
def test_invalid_credentials(self):
|
|
"""测试无效凭证"""
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/auth/login",
|
|
json={"username": "invalid", "password": "invalid"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
assert "error" in response.json() or "message" in response.json()
|
|
|
|
def test_missing_credentials(self):
|
|
"""测试缺少凭证"""
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/auth/login",
|
|
json={}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
def test_token_required(self):
|
|
"""测试需要token的接口"""
|
|
response = self.client.get(f"{self.base_url}/api/users")
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_invalid_token(self):
|
|
"""测试无效token"""
|
|
response = self.client.get(
|
|
f"{self.base_url}/api/users",
|
|
headers={"Authorization": "Bearer invalid_token"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_expired_token(self):
|
|
"""测试过期token(模拟)"""
|
|
# 使用一个格式正确但可能过期的token
|
|
response = self.client.get(
|
|
f"{self.base_url}/api/users",
|
|
headers={"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestInputValidation(SecurityTestBase):
|
|
"""输入验证测试"""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self):
|
|
self.setup_auth()
|
|
yield
|
|
self.cleanup()
|
|
|
|
def test_long_username(self):
|
|
"""测试超长用户名"""
|
|
self.setup_auth()
|
|
long_username = "a" * 1000
|
|
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/users",
|
|
json={
|
|
"username": long_username,
|
|
"password": "password123",
|
|
"nickname": "Test",
|
|
"email": "test@example.com"
|
|
},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回400(错误请求)
|
|
assert response.status_code == 400
|
|
|
|
def test_invalid_email_format(self):
|
|
"""测试无效邮箱格式"""
|
|
self.setup_auth()
|
|
invalid_emails = [
|
|
"invalid",
|
|
"@example.com",
|
|
"test@",
|
|
"test@.com",
|
|
"test@com.",
|
|
]
|
|
|
|
for email in invalid_emails:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/users",
|
|
json={
|
|
"username": f"test_{hash(email)}",
|
|
"password": "password123",
|
|
"nickname": "Test",
|
|
"email": email
|
|
},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回400(错误请求)
|
|
assert response.status_code == 400, f"无效邮箱格式未拒绝: {email}"
|
|
|
|
def test_weak_password(self):
|
|
"""测试弱密码"""
|
|
self.setup_auth()
|
|
weak_passwords = [
|
|
"123",
|
|
"password",
|
|
"123456",
|
|
"qwerty",
|
|
]
|
|
|
|
for password in weak_passwords:
|
|
response = self.client.post(
|
|
f"{self.base_url}/api/users",
|
|
json={
|
|
"username": f"test_{hash(password)}",
|
|
"password": password,
|
|
"nickname": "Test",
|
|
"email": f"test_{hash(password)}@example.com"
|
|
},
|
|
headers=self.get_headers()
|
|
)
|
|
|
|
# 应该返回400(错误请求)或201(如果系统不强制密码强度)
|
|
assert response.status_code in [201, 400]
|
|
|
|
|
|
def hash(text: str) -> str:
|
|
"""生成文本的哈希值"""
|
|
import hashlib
|
|
return hashlib.md5(text.encode()).hexdigest()[:16]
|