08ea5fbe98
添加用户管理视图、API和状态管理文件
359 lines
8.8 KiB
Python
359 lines
8.8 KiB
Python
"""
|
|
文件上传下载功能模块
|
|
|
|
提供文件上传、下载、验证和管理功能。
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import uuid
|
|
import shutil
|
|
from typing import Any, Dict, List, Optional, BinaryIO
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
|
|
@dataclass
|
|
class UploadResult:
|
|
"""上传结果"""
|
|
success: bool
|
|
file_id: Optional[str] = None
|
|
file_path: Optional[str] = None
|
|
filename: Optional[str] = None
|
|
size: int = 0
|
|
message: str = ""
|
|
|
|
|
|
@dataclass
|
|
class DownloadResult:
|
|
"""下载结果"""
|
|
success: bool
|
|
content: Optional[bytes] = None
|
|
filename: Optional[str] = None
|
|
message: str = ""
|
|
|
|
|
|
class FileTypeValidator:
|
|
"""文件类型验证器"""
|
|
|
|
def __init__(self, allowed_extensions: Optional[List[str]] = None):
|
|
"""
|
|
初始化文件类型验证器
|
|
|
|
Args:
|
|
allowed_extensions: 允许的文件扩展名列表
|
|
"""
|
|
self._allowed_extensions = allowed_extensions or []
|
|
|
|
def validate(self, filename: str) -> bool:
|
|
"""
|
|
验证文件类型
|
|
|
|
Args:
|
|
filename: 文件名
|
|
|
|
Returns:
|
|
是否允许
|
|
"""
|
|
if not self._allowed_extensions:
|
|
return True
|
|
|
|
ext = Path(filename).suffix.lower()
|
|
return ext in self._allowed_extensions
|
|
|
|
|
|
class FileSizeValidator:
|
|
"""文件大小验证器"""
|
|
|
|
def __init__(self, max_size: int):
|
|
"""
|
|
初始化文件大小验证器
|
|
|
|
Args:
|
|
max_size: 最大文件大小(字节)
|
|
"""
|
|
self._max_size = max_size
|
|
|
|
def validate(self, size: int) -> bool:
|
|
"""
|
|
验证文件大小
|
|
|
|
Args:
|
|
size: 文件大小(字节)
|
|
|
|
Returns:
|
|
是否允许
|
|
"""
|
|
return size <= self._max_size
|
|
|
|
|
|
class FilenameSanitizer:
|
|
"""文件名净化器"""
|
|
|
|
# 危险字符
|
|
DANGEROUS_CHARS = r'[;|&$<>\`\\]'
|
|
|
|
def sanitize(self, filename: str) -> str:
|
|
"""
|
|
净化文件名
|
|
|
|
Args:
|
|
filename: 原始文件名
|
|
|
|
Returns:
|
|
安全的文件名
|
|
"""
|
|
# 移除路径遍历
|
|
filename = os.path.basename(filename)
|
|
|
|
# 移除危险字符
|
|
filename = re.sub(self.DANGEROUS_CHARS, '', filename)
|
|
|
|
# 移除连续的点
|
|
filename = re.sub(r'\.{2,}', '.', filename)
|
|
|
|
# 确保不为空
|
|
if not filename or filename == '.':
|
|
filename = 'unnamed_file'
|
|
|
|
return filename
|
|
|
|
|
|
class FileStorageManager:
|
|
"""文件存储管理器"""
|
|
|
|
def __init__(self, storage_dir: str):
|
|
"""
|
|
初始化存储管理器
|
|
|
|
Args:
|
|
storage_dir: 存储目录
|
|
"""
|
|
self._storage_dir = storage_dir
|
|
self._metadata: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# 创建存储目录
|
|
os.makedirs(storage_dir, exist_ok=True)
|
|
|
|
def save(
|
|
self,
|
|
content: bytes,
|
|
filename: str,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> str:
|
|
"""
|
|
保存文件
|
|
|
|
Args:
|
|
content: 文件内容
|
|
filename: 文件名
|
|
metadata: 元数据
|
|
|
|
Returns:
|
|
文件ID
|
|
"""
|
|
file_id = str(uuid.uuid4())
|
|
file_path = os.path.join(self._storage_dir, file_id)
|
|
|
|
with open(file_path, 'wb') as f:
|
|
f.write(content)
|
|
|
|
# 保存元数据
|
|
self._metadata[file_id] = {
|
|
'filename': filename,
|
|
'size': len(content),
|
|
'metadata': metadata or {},
|
|
}
|
|
|
|
return file_id
|
|
|
|
def get(self, file_id: str) -> Optional[bytes]:
|
|
"""
|
|
获取文件内容
|
|
|
|
Args:
|
|
file_id: 文件ID
|
|
|
|
Returns:
|
|
文件内容
|
|
"""
|
|
file_path = os.path.join(self._storage_dir, file_id)
|
|
|
|
if not os.path.exists(file_path):
|
|
return None
|
|
|
|
with open(file_path, 'rb') as f:
|
|
return f.read()
|
|
|
|
def get_metadata(self, file_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
获取文件元数据
|
|
|
|
Args:
|
|
file_id: 文件ID
|
|
|
|
Returns:
|
|
元数据
|
|
"""
|
|
meta = self._metadata.get(file_id)
|
|
if meta:
|
|
return meta.get('metadata')
|
|
return None
|
|
|
|
def delete(self, file_id: str) -> bool:
|
|
"""
|
|
删除文件
|
|
|
|
Args:
|
|
file_id: 文件ID
|
|
|
|
Returns:
|
|
是否成功
|
|
"""
|
|
file_path = os.path.join(self._storage_dir, file_id)
|
|
|
|
if os.path.exists(file_path):
|
|
os.unlink(file_path)
|
|
if file_id in self._metadata:
|
|
del self._metadata[file_id]
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
class FileUploader:
|
|
"""文件上传器"""
|
|
|
|
def __init__(
|
|
self,
|
|
upload_dir: str,
|
|
type_validator: Optional[FileTypeValidator] = None,
|
|
size_validator: Optional[FileSizeValidator] = None,
|
|
filename_sanitizer: Optional[FilenameSanitizer] = None
|
|
):
|
|
"""
|
|
初始化文件上传器
|
|
|
|
Args:
|
|
upload_dir: 上传目录
|
|
type_validator: 文件类型验证器
|
|
size_validator: 文件大小验证器
|
|
filename_sanitizer: 文件名净化器
|
|
"""
|
|
self._upload_dir = upload_dir
|
|
self._type_validator = type_validator or FileTypeValidator()
|
|
self._size_validator = size_validator or FileSizeValidator(max_size=10 * 1024 * 1024) # 10MB
|
|
self._filename_sanitizer = filename_sanitizer or FilenameSanitizer()
|
|
self._storage = FileStorageManager(upload_dir)
|
|
|
|
def upload(self, file_obj: BinaryIO, filename: str) -> UploadResult:
|
|
"""
|
|
上传文件
|
|
|
|
Args:
|
|
file_obj: 文件对象
|
|
filename: 文件名
|
|
|
|
Returns:
|
|
上传结果
|
|
"""
|
|
# 净化文件名
|
|
safe_filename = self._filename_sanitizer.sanitize(filename)
|
|
|
|
# 验证文件类型
|
|
if not self._type_validator.validate(safe_filename):
|
|
return UploadResult(
|
|
success=False,
|
|
message=f"不支持的文件类型: {safe_filename}"
|
|
)
|
|
|
|
# 读取文件内容
|
|
content = file_obj.read()
|
|
|
|
# 验证文件大小
|
|
if not self._size_validator.validate(len(content)):
|
|
return UploadResult(
|
|
success=False,
|
|
message=f"文件大小超过限制"
|
|
)
|
|
|
|
# 保存文件
|
|
file_id = self._storage.save(content, safe_filename)
|
|
file_path = os.path.join(self._upload_dir, file_id)
|
|
|
|
return UploadResult(
|
|
success=True,
|
|
file_id=file_id,
|
|
file_path=file_path,
|
|
filename=safe_filename,
|
|
size=len(content)
|
|
)
|
|
|
|
def upload_batch(self, file_paths: List[str]) -> List[UploadResult]:
|
|
"""
|
|
批量上传文件
|
|
|
|
Args:
|
|
file_paths: 文件路径列表
|
|
|
|
Returns:
|
|
上传结果列表
|
|
"""
|
|
results = []
|
|
|
|
for file_path in file_paths:
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
filename = os.path.basename(file_path)
|
|
result = self.upload(f, filename)
|
|
results.append(result)
|
|
except Exception as e:
|
|
results.append(UploadResult(
|
|
success=False,
|
|
message=str(e)
|
|
))
|
|
|
|
return results
|
|
|
|
|
|
class FileDownloader:
|
|
"""文件下载器"""
|
|
|
|
def __init__(self, storage_manager: Optional[FileStorageManager] = None):
|
|
"""
|
|
初始化文件下载器
|
|
|
|
Args:
|
|
storage_manager: 存储管理器
|
|
"""
|
|
self._storage = storage_manager
|
|
|
|
def download(self, file_id: str) -> DownloadResult:
|
|
"""
|
|
下载文件
|
|
|
|
Args:
|
|
file_id: 文件ID
|
|
|
|
Returns:
|
|
下载结果
|
|
"""
|
|
if self._storage is None:
|
|
return DownloadResult(
|
|
success=False,
|
|
message="存储管理器未设置"
|
|
)
|
|
|
|
content = self._storage.get(file_id)
|
|
|
|
if content is None:
|
|
return DownloadResult(
|
|
success=False,
|
|
message="文件不存在"
|
|
)
|
|
|
|
return DownloadResult(
|
|
success=True,
|
|
content=content
|
|
)
|