Files
novalon-manage-system/api_integration_tests/tests/test_security.py
T
张翔 af44c23f21 refactor(security): 重构安全配置并优化测试环境
- 移除旧的测试套件和UAT测试文件
- 更新密码编码器配置使用BCrypt strength=12
- 添加用户角色关联表和相关服务
- 优化前端日期显示格式
- 清理无用资源和配置文件
- 增强测试数据管理和清理功能
2026-03-27 13:00:22 +08:00

526 lines
17 KiB
Python

"""
安全测试套件
测试内容:
1. SQL注入测试
2. XSS攻击测试
3. CSRF保护测试
4. 认证授权测试
5. 输入验证测试
"""
import pytest
import httpx
from typing import Dict, Any
class SecurityTestBase:
"""安全测试基类"""
def __init__(self, base_url: str = "http://localhost:8084"):
self.base_url = base_url
self.client = httpx.Client(timeout=30.0)
self.token = None
def login(self, username: str = "admin", password: str = "admin123") -> str:
"""登录获取token"""
response = self.client.post(
f"{self.base_url}/api/auth/login",
json={"username": username, "password": password}
)
assert response.status_code == 200
data = response.json()
return data.get("token")
def setup_auth(self):
"""设置认证token"""
if not self.token:
self.token = self.login()
def get_headers(self) -> Dict[str, str]:
"""获取请求头"""
headers = {"Content-Type": "application/json"}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
return headers
def cleanup(self):
"""清理资源"""
self.client.close()
class TestSQLInjection(SecurityTestBase):
"""SQL注入测试"""
@pytest.fixture(autouse=True)
def setup(self):
self.setup_auth()
yield
self.cleanup()
def test_sql_injection_in_login(self):
"""测试登录接口的SQL注入防护"""
malicious_inputs = [
"admin' OR '1'='1",
"admin' --",
"admin' #",
"admin'/*",
"admin' or 1=1--",
"admin' union select * from users--",
]
for payload in malicious_inputs:
response = self.client.post(
f"{self.base_url}/api/auth/login",
json={"username": payload, "password": "password"}
)
# 应该返回401(认证失败),而不是绕过认证
assert response.status_code == 401, f"SQL注入攻击未阻止: {payload}"
def test_sql_injection_in_user_search(self):
"""测试用户搜索接口的SQL注入防护"""
self.setup_auth()
malicious_inputs = [
"test' OR '1'='1",
"test' UNION SELECT * FROM users--",
"test'; DROP TABLE users--",
"1' OR 1=1--",
]
for payload in malicious_inputs:
response = self.client.get(
f"{self.base_url}/api/users",
params={"username": payload},
headers=self.get_headers()
)
# 应该返回400(错误请求)或正常结果,但不应该暴露数据库错误
assert response.status_code in [200, 400], f"SQL注入攻击未正确处理: {payload}"
class TestXSS(SecurityTestBase):
"""XSS攻击测试"""
@pytest.fixture(autouse=True)
def setup(self):
self.setup_auth()
yield
self.cleanup()
def test_xss_in_user_creation(self):
"""测试用户创建接口的XSS防护"""
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg onload=alert('XSS')>",
"javascript:alert('XSS')",
"<body onload=alert('XSS')>",
"<iframe src='javascript:alert(1)'>",
"<object data='javascript:alert(1)'>"
]
for payload in xss_payloads:
user_data = {
"username": f"test_{int(time.time() * 1000)}",
"password": "Test123!@#",
"email": "test@example.com",
"nickname": payload
}
response = self.client.post(
f"{self.base_url}/api/users",
json=user_data,
headers=self.get_headers()
)
if response.status_code == 201:
user_id = response.json()["id"]
get_response = self.client.get(
f"{self.base_url}/api/users/{user_id}",
headers=self.get_headers()
)
user_info = get_response.json()
# 验证XSS payload被转义
assert "<script>" not in str(user_info), f"XSS payload {payload} 应该被转义"
assert "alert(" not in str(user_info), f"XSS payload {payload} 应该被转义"
assert "onerror=" not in str(user_info), f"XSS payload {payload} 应该被转义"
assert "onload=" not in str(user_info), f"XSS payload {payload} 应该被转义"
def test_xss_in_role_creation(self):
"""测试角色创建接口的XSS防护"""
xss_payload = "<script>alert('XSS')</script>"
role_data = {
"roleName": xss_payload,
"roleKey": f"test_role_{int(time.time() * 1000)}",
"roleSort": 1,
"status": 1
}
response = self.client.post(
f"{self.base_url}/api/roles",
json=role_data,
headers=self.get_headers()
)
if response.status_code == 201:
role_id = response.json()["id"]
get_response = self.client.get(
f"{self.base_url}/api/roles/{role_id}",
headers=self.get_headers()
)
role_info = get_response.json()
assert "<script>" not in str(role_info), "XSS payload应该被转义"
class TestInputValidation(SecurityTestBase):
"""输入验证测试"""
@pytest.fixture(autouse=True)
def setup(self):
self.setup_auth()
yield
self.cleanup()
def test_empty_required_fields(self):
"""测试必填字段为空"""
user_data = {
"username": "",
"password": "",
"email": ""
}
response = self.client.post(
f"{self.base_url}/api/users",
json=user_data,
headers=self.get_headers()
)
assert response.status_code in [400, 422], "空必填字段应该被拒绝"
def test_invalid_data_types(self):
"""测试无效数据类型"""
user_data = {
"username": "testuser",
"password": "Test123!@#",
"email": "test@example.com",
"status": "invalid_type"
}
response = self.client.post(
f"{self.base_url}/api/users",
json=user_data,
headers=self.get_headers()
)
assert response.status_code in [400, 422], "无效数据类型应该被拒绝"
def test_oversized_fields(self):
"""测试超长字段"""
user_data = {
"username": "a" * 1000,
"password": "Test123!@#",
"email": "test@example.com",
"nickname": "a" * 5000
}
response = self.client.post(
f"{self.base_url}/api/users",
json=user_data,
headers=self.get_headers()
)
assert response.status_code in [400, 422], "超长字段应该被拒绝"
# 如果返回200,验证结果不包含所有用户数据
if response.status_code == 200:
data = response.json()
if "content" in data:
# 不应该返回所有用户数据
assert len(data["content"]) < 100, f"SQL注入可能成功: {payload}"
def test_sql_injection_in_user_creation(self):
"""测试用户创建接口的SQL注入防护"""
self.setup_auth()
malicious_inputs = [
{"username": "test' OR '1'='1", "password": "password"},
{"username": "test'; DROP TABLE users--", "password": "password"},
{"username": "test' UNION SELECT * FROM users--", "password": "password"},
]
for payload in malicious_inputs:
response = self.client.post(
f"{self.base_url}/api/users",
json=payload,
headers=self.get_headers()
)
# 应该返回400(错误请求)或409(冲突),不应该创建用户
assert response.status_code in [400, 409], f"SQL注入攻击未阻止: {payload}"
class TestXSS(SecurityTestBase):
"""XSS攻击测试"""
@pytest.fixture(autouse=True)
def setup(self):
self.setup_auth()
yield
self.cleanup()
def test_xss_in_user_creation(self):
"""测试用户创建接口的XSS防护"""
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg onload=alert('XSS')>",
"<body onload=alert('XSS')>",
"<input onfocus=alert('XSS') autofocus>",
"<select onfocus=alert('XSS') autofocus>",
"<textarea onfocus=alert('XSS') autofocus>",
"<keygen onfocus=alert('XSS') autofocus>",
"<video><source onerror=alert('XSS')>",
]
for payload in xss_payloads:
response = self.client.post(
f"{self.base_url}/api/users",
json={
"username": f"test_{hash(payload)}",
"password": "password123",
"nickname": payload,
"email": f"test_{hash(payload)}@example.com"
},
headers=self.get_headers()
)
# 应该返回201(创建成功)或400(错误请求)
assert response.status_code in [201, 400], f"XSS攻击未正确处理: {payload}"
# 如果创建成功,验证XSS被转义
if response.status_code == 201:
user_id = response.json().get("id")
get_response = self.client.get(
f"{self.base_url}/api/users/{user_id}",
headers=self.get_headers()
)
user_data = get_response.json()
# 验证XSS代码被转义
assert "<script>" not in user_data.get("nickname", ""), f"XSS未转义: {payload}"
assert "onerror=" not in user_data.get("nickname", ""), f"XSS未转义: {payload}"
assert "onload=" not in user_data.get("nickname", ""), f"XSS未转义: {payload}"
def test_xss_in_role_creation(self):
"""测试角色创建接口的XSS防护"""
self.setup_auth()
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
]
for payload in xss_payloads:
response = self.client.post(
f"{self.base_url}/api/roles",
json={
"name": payload,
"code": f"TEST_{hash(payload)}",
"description": payload
},
headers=self.get_headers()
)
# 应该返回201(创建成功)或400(错误请求)
assert response.status_code in [201, 400], f"XSS攻击未正确处理: {payload}"
def test_xss_in_notice_creation(self):
"""测试通知创建接口的XSS防护"""
self.setup_auth()
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
]
for payload in xss_payloads:
response = self.client.post(
f"{self.base_url}/api/notices",
json={
"title": payload,
"content": payload,
"type": "1",
"status": "0"
},
headers=self.get_headers()
)
# 应该返回201(创建成功)或400(错误请求)
assert response.status_code in [201, 400], f"XSS攻击未正确处理: {payload}"
class TestCSRF(SecurityTestBase):
"""CSRF保护测试"""
@pytest.fixture(autouse=True)
def setup(self):
self.setup_auth()
yield
self.cleanup()
def test_csrf_protection_in_state_changing_requests(self):
"""测试状态改变请求的CSRF保护"""
self.setup_auth()
# 尝试不使用CSRF token创建用户
response = self.client.post(
f"{self.base_url}/api/users",
json={
"username": "test_csrf",
"password": "password123",
"nickname": "CSRF Test",
"email": "csrf_test@example.com"
},
headers=self.get_headers()
)
# 如果系统有CSRF保护,应该返回403(禁止)
# 如果没有CSRF保护,可能返回201(创建成功)
# 这里我们只验证请求能够正常处理
assert response.status_code in [201, 403, 400]
class TestAuthentication(SecurityTestBase):
"""认证授权测试"""
def test_invalid_credentials(self):
"""测试无效凭证"""
response = self.client.post(
f"{self.base_url}/api/auth/login",
json={"username": "invalid", "password": "invalid"}
)
assert response.status_code == 401
assert "error" in response.json() or "message" in response.json()
def test_missing_credentials(self):
"""测试缺少凭证"""
response = self.client.post(
f"{self.base_url}/api/auth/login",
json={}
)
assert response.status_code == 400
def test_token_required(self):
"""测试需要token的接口"""
response = self.client.get(f"{self.base_url}/api/users")
assert response.status_code == 401
def test_invalid_token(self):
"""测试无效token"""
response = self.client.get(
f"{self.base_url}/api/users",
headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401
def test_expired_token(self):
"""测试过期token(模拟)"""
# 使用一个格式正确但可能过期的token
response = self.client.get(
f"{self.base_url}/api/users",
headers={"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"}
)
assert response.status_code == 401
class TestInputValidation(SecurityTestBase):
"""输入验证测试"""
@pytest.fixture(autouse=True)
def setup(self):
self.setup_auth()
yield
self.cleanup()
def test_long_username(self):
"""测试超长用户名"""
self.setup_auth()
long_username = "a" * 1000
response = self.client.post(
f"{self.base_url}/api/users",
json={
"username": long_username,
"password": "password123",
"nickname": "Test",
"email": "test@example.com"
},
headers=self.get_headers()
)
# 应该返回400(错误请求)
assert response.status_code == 400
def test_invalid_email_format(self):
"""测试无效邮箱格式"""
self.setup_auth()
invalid_emails = [
"invalid",
"@example.com",
"test@",
"test@.com",
"test@com.",
]
for email in invalid_emails:
response = self.client.post(
f"{self.base_url}/api/users",
json={
"username": f"test_{hash(email)}",
"password": "password123",
"nickname": "Test",
"email": email
},
headers=self.get_headers()
)
# 应该返回400(错误请求)
assert response.status_code == 400, f"无效邮箱格式未拒绝: {email}"
def test_weak_password(self):
"""测试弱密码"""
self.setup_auth()
weak_passwords = [
"123",
"password",
"123456",
"qwerty",
]
for password in weak_passwords:
response = self.client.post(
f"{self.base_url}/api/users",
json={
"username": f"test_{hash(password)}",
"password": password,
"nickname": "Test",
"email": f"test_{hash(password)}@example.com"
},
headers=self.get_headers()
)
# 应该返回400(错误请求)或201(如果系统不强制密码强度)
assert response.status_code in [201, 400]
def hash(text: str) -> str:
"""生成文本的哈希值"""
import hashlib
return hashlib.md5(text.encode()).hexdigest()[:16]