Files
everything-is-suitable/everything-is-suitable-test/python_e2e/core/security.py
T
张翔 08ea5fbe98 feat(admin): 添加用户管理相关文件
添加用户管理视图、API和状态管理文件
2026-03-28 14:37:29 +08:00

483 lines
12 KiB
Python

"""
安全测试模块
提供SQL注入、XSS、CSRF等安全防护功能。
"""
import re
import hashlib
import secrets
import time
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from enum import Enum
class ThreatLevel(Enum):
"""威胁等级"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DetectionResult:
"""检测结果"""
is_threat: bool
threat_type: str
level: ThreatLevel
details: str = ""
@dataclass
class SQLInjectionResult:
"""SQL注入检测结果"""
is_injection: bool = False
level: ThreatLevel = ThreatLevel.LOW
details: str = ""
@property
def is_threat(self) -> bool:
return self.is_injection
@property
def threat_type(self) -> str:
return "SQL_INJECTION"
@dataclass
class XSSResult:
"""XSS检测结果"""
is_xss: bool = False
level: ThreatLevel = ThreatLevel.LOW
details: str = ""
@property
def is_threat(self) -> bool:
return self.is_xss
@property
def threat_type(self) -> str:
return "XSS"
@dataclass
class PasswordStrengthResult:
"""密码强度结果"""
score: int
strength: str
suggestions: List[str] = field(default_factory=list)
@dataclass
class SecurityEvent:
"""安全事件"""
timestamp: float
event_type: str
source_ip: str
details: Dict[str, Any]
@dataclass
class SecurityReport:
"""安全扫描报告"""
total_scanned: int
threats: List[DetectionResult]
scan_time: float
class SQLInjectionDetector:
"""SQL注入检测器"""
# SQL注入特征模式
PATTERNS = [
r"(\%27)|(\')|(\-\-)|(\%23)|(#)", # 单引号、注释
r"((\%3D)|(=))[^\n]*((\%27)|(\')|(\-\-)|(\%3B)|(;))", # =后面跟引号或注释
r"\w*((\%27)|(\'))((\%6F)|o|(\%4F))((\%72)|r|(\%52))", # 'or
r"((\%27)|(\'))union", # 'union
r"exec(\s|\+)+(s|x)p\w+", # exec xp_
r"UNION\s+SELECT", # UNION SELECT
r"INSERT\s+INTO", # INSERT INTO
r"DELETE\s+FROM", # DELETE FROM
r"DROP\s+TABLE", # DROP TABLE
]
def __init__(self):
self._compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.PATTERNS]
def detect(self, input_str: str) -> SQLInjectionResult:
"""
检测SQL注入
Args:
input_str: 输入字符串
Returns:
检测结果
"""
for pattern in self._compiled_patterns:
if pattern.search(input_str):
return SQLInjectionResult(
is_injection=True,
level=ThreatLevel.HIGH,
details=f"匹配模式: {pattern.pattern}"
)
return SQLInjectionResult(is_injection=False, level=ThreatLevel.LOW)
class XSSDetector:
"""XSS检测器"""
# XSS攻击特征模式
PATTERNS = [
r"<script[^>]*>[\s\S]*?</script>", # <script>标签
r"javascript:", # javascript:协议
r"on\w+\s*=", # 事件处理器
r"<iframe", # iframe标签
r"<object", # object标签
r"<embed", # embed标签
r"<form", # form标签
r"<input[^>]*type\s*=\s*['\"]?hidden", # hidden input
r"expression\s*\(", # CSS expression
r"url\s*\(\s*['\"]?javascript:", # CSS url javascript
]
def __init__(self):
self._compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.PATTERNS]
def detect(self, input_str: str) -> XSSResult:
"""
检测XSS攻击
Args:
input_str: 输入字符串
Returns:
检测结果
"""
for pattern in self._compiled_patterns:
if pattern.search(input_str):
return XSSResult(
is_xss=True,
level=ThreatLevel.HIGH,
details=f"匹配模式: {pattern.pattern}"
)
return XSSResult(is_xss=False, level=ThreatLevel.LOW)
class CSRFProtector:
"""CSRF防护器"""
def __init__(self, token_expiry: int = 3600):
"""
初始化CSRF防护器
Args:
token_expiry: Token过期时间(秒)
"""
self._token_expiry = token_expiry
self._tokens: Dict[str, Dict[str, Any]] = {}
def generate_token(self, user_id: str) -> str:
"""
生成CSRF Token
Args:
user_id: 用户ID
Returns:
Token字符串
"""
token = secrets.token_urlsafe(32)
self._tokens[token] = {
"user_id": user_id,
"created_at": time.time(),
}
return token
def validate_token(self, user_id: str, token: str) -> bool:
"""
验证CSRF Token
Args:
user_id: 用户ID
token: Token字符串
Returns:
是否有效
"""
if token not in self._tokens:
return False
token_data = self._tokens[token]
# 检查用户ID
if token_data["user_id"] != user_id:
return False
# 检查是否过期
if time.time() - token_data["created_at"] > self._token_expiry:
del self._tokens[token]
return False
return True
def invalidate_token(self, token: str) -> None:
"""使Token失效"""
if token in self._tokens:
del self._tokens[token]
class InputSanitizer:
"""输入净化器"""
# HTML危险标签和属性
DANGEROUS_TAGS = [
"script", "iframe", "object", "embed", "form", "input",
"textarea", "button", "link", "meta", "style"
]
DANGEROUS_ATTRIBUTES = [
"onerror", "onload", "onclick", "onmouseover", "onmouseout",
"onkeydown", "onkeypress", "onkeyup", "onsubmit", "onchange",
"onfocus", "onblur", "onselect", "onreset"
]
def sanitize_html(self, html: str) -> str:
"""
净化HTML内容
Args:
html: HTML字符串
Returns:
净化后的HTML
"""
# 移除危险标签
for tag in self.DANGEROUS_TAGS:
pattern = f"<{tag}[^>]*>[\\s\\S]*?</{tag}>"
html = re.sub(pattern, "", html, flags=re.IGNORECASE)
pattern = f"<{tag}[^>]*/?>"
html = re.sub(pattern, "", html, flags=re.IGNORECASE)
# 移除危险属性
for attr in self.DANGEROUS_ATTRIBUTES:
pattern = f"\\s{attr}=[\"'][^\"']*[\"']"
html = re.sub(pattern, "", html, flags=re.IGNORECASE)
# 移除javascript:协议
html = re.sub(r"javascript:", "", html, flags=re.IGNORECASE)
return html
def sanitize_sql(self, input_str: str) -> str:
"""
净化SQL输入
Args:
input_str: 输入字符串
Returns:
净化后的字符串
"""
# 转义单引号
return input_str.replace("'", "''")
class PasswordStrengthChecker:
"""密码强度检查器"""
def check(self, password: str) -> PasswordStrengthResult:
"""
检查密码强度
Args:
password: 密码字符串
Returns:
强度结果
"""
score = 0
suggestions = []
# 长度检查
if len(password) >= 8:
score += 2
elif len(password) >= 6:
score += 1
else:
suggestions.append("密码长度至少8位")
# 包含小写字母
if re.search(r"[a-z]", password):
score += 1
else:
suggestions.append("应包含小写字母")
# 包含大写字母
if re.search(r"[A-Z]", password):
score += 1
else:
suggestions.append("应包含大写字母")
# 包含数字
if re.search(r"\d", password):
score += 1
else:
suggestions.append("应包含数字")
# 包含特殊字符
if re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
score += 2
else:
suggestions.append("应包含特殊字符")
# 确定强度等级
if score >= 7:
strength = "strong"
elif score >= 4:
strength = "medium"
else:
strength = "weak"
return PasswordStrengthResult(
score=score,
strength=strength,
suggestions=suggestions
)
class SecurityHeaders:
"""安全HTTP头部生成器"""
def get_headers(self) -> Dict[str, str]:
"""
获取安全HTTP头部
Returns:
安全头部字典
"""
return {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"Content-Security-Policy": "default-src 'self'",
"Referrer-Policy": "strict-origin-when-cross-origin",
"Permissions-Policy": "geolocation=(), microphone=(), camera=()",
}
class SecurityAuditLogger:
"""安全审计日志器"""
def __init__(self):
self._events: List[SecurityEvent] = []
def log_event(
self,
event_type: str,
source_ip: str,
details: Dict[str, Any]
) -> None:
"""
记录安全事件
Args:
event_type: 事件类型
source_ip: 来源IP
details: 详细信息
"""
event = SecurityEvent(
timestamp=time.time(),
event_type=event_type,
source_ip=source_ip,
details=details
)
self._events.append(event)
def get_events(
self,
event_type: Optional[str] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None
) -> List[SecurityEvent]:
"""
获取安全事件
Args:
event_type: 事件类型过滤
start_time: 开始时间
end_time: 结束时间
Returns:
事件列表
"""
events = self._events
if event_type:
events = [e for e in events if e.event_type == event_type]
if start_time:
events = [e for e in events if e.timestamp >= start_time]
if end_time:
events = [e for e in events if e.timestamp <= end_time]
return events
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
event_types = {}
for event in self._events:
event_types[event.event_type] = event_types.get(event.event_type, 0) + 1
return {
"total_events": len(self._events),
"event_types": event_types,
}
class SecurityScanner:
"""综合安全扫描器"""
def __init__(self):
self._sql_detector = SQLInjectionDetector()
self._xss_detector = XSSDetector()
def scan(self, data: Dict[str, Any]) -> SecurityReport:
"""
扫描数据
Args:
data: 要扫描的数据
Returns:
扫描报告
"""
threats = []
start_time = time.time()
for key, value in data.items():
if isinstance(value, str):
# SQL注入检测
sql_result = self._sql_detector.detect(value)
if sql_result.is_injection:
threats.append(sql_result)
# XSS检测
xss_result = self._xss_detector.detect(value)
if xss_result.is_xss:
threats.append(xss_result)
scan_time = time.time() - start_time
return SecurityReport(
total_scanned=len(data),
threats=threats,
scan_time=scan_time
)