""" 文件上传下载功能模块 提供文件上传、下载、验证和管理功能。 """ import os import re import uuid import shutil from typing import Any, Dict, List, Optional, BinaryIO from dataclasses import dataclass, field from pathlib import Path @dataclass class UploadResult: """上传结果""" success: bool file_id: Optional[str] = None file_path: Optional[str] = None filename: Optional[str] = None size: int = 0 message: str = "" @dataclass class DownloadResult: """下载结果""" success: bool content: Optional[bytes] = None filename: Optional[str] = None message: str = "" class FileTypeValidator: """文件类型验证器""" def __init__(self, allowed_extensions: Optional[List[str]] = None): """ 初始化文件类型验证器 Args: allowed_extensions: 允许的文件扩展名列表 """ self._allowed_extensions = allowed_extensions or [] def validate(self, filename: str) -> bool: """ 验证文件类型 Args: filename: 文件名 Returns: 是否允许 """ if not self._allowed_extensions: return True ext = Path(filename).suffix.lower() return ext in self._allowed_extensions class FileSizeValidator: """文件大小验证器""" def __init__(self, max_size: int): """ 初始化文件大小验证器 Args: max_size: 最大文件大小(字节) """ self._max_size = max_size def validate(self, size: int) -> bool: """ 验证文件大小 Args: size: 文件大小(字节) Returns: 是否允许 """ return size <= self._max_size class FilenameSanitizer: """文件名净化器""" # 危险字符 DANGEROUS_CHARS = r'[;|&$<>\`\\]' def sanitize(self, filename: str) -> str: """ 净化文件名 Args: filename: 原始文件名 Returns: 安全的文件名 """ # 移除路径遍历 filename = os.path.basename(filename) # 移除危险字符 filename = re.sub(self.DANGEROUS_CHARS, '', filename) # 移除连续的点 filename = re.sub(r'\.{2,}', '.', filename) # 确保不为空 if not filename or filename == '.': filename = 'unnamed_file' return filename class FileStorageManager: """文件存储管理器""" def __init__(self, storage_dir: str): """ 初始化存储管理器 Args: storage_dir: 存储目录 """ self._storage_dir = storage_dir self._metadata: Dict[str, Dict[str, Any]] = {} # 创建存储目录 os.makedirs(storage_dir, exist_ok=True) def save( self, content: bytes, filename: str, metadata: Optional[Dict[str, Any]] = None ) -> str: """ 保存文件 Args: content: 文件内容 filename: 文件名 metadata: 元数据 Returns: 文件ID """ file_id = str(uuid.uuid4()) file_path = os.path.join(self._storage_dir, file_id) with open(file_path, 'wb') as f: f.write(content) # 保存元数据 self._metadata[file_id] = { 'filename': filename, 'size': len(content), 'metadata': metadata or {}, } return file_id def get(self, file_id: str) -> Optional[bytes]: """ 获取文件内容 Args: file_id: 文件ID Returns: 文件内容 """ file_path = os.path.join(self._storage_dir, file_id) if not os.path.exists(file_path): return None with open(file_path, 'rb') as f: return f.read() def get_metadata(self, file_id: str) -> Optional[Dict[str, Any]]: """ 获取文件元数据 Args: file_id: 文件ID Returns: 元数据 """ meta = self._metadata.get(file_id) if meta: return meta.get('metadata') return None def delete(self, file_id: str) -> bool: """ 删除文件 Args: file_id: 文件ID Returns: 是否成功 """ file_path = os.path.join(self._storage_dir, file_id) if os.path.exists(file_path): os.unlink(file_path) if file_id in self._metadata: del self._metadata[file_id] return True return False class FileUploader: """文件上传器""" def __init__( self, upload_dir: str, type_validator: Optional[FileTypeValidator] = None, size_validator: Optional[FileSizeValidator] = None, filename_sanitizer: Optional[FilenameSanitizer] = None ): """ 初始化文件上传器 Args: upload_dir: 上传目录 type_validator: 文件类型验证器 size_validator: 文件大小验证器 filename_sanitizer: 文件名净化器 """ self._upload_dir = upload_dir self._type_validator = type_validator or FileTypeValidator() self._size_validator = size_validator or FileSizeValidator(max_size=10 * 1024 * 1024) # 10MB self._filename_sanitizer = filename_sanitizer or FilenameSanitizer() self._storage = FileStorageManager(upload_dir) def upload(self, file_obj: BinaryIO, filename: str) -> UploadResult: """ 上传文件 Args: file_obj: 文件对象 filename: 文件名 Returns: 上传结果 """ # 净化文件名 safe_filename = self._filename_sanitizer.sanitize(filename) # 验证文件类型 if not self._type_validator.validate(safe_filename): return UploadResult( success=False, message=f"不支持的文件类型: {safe_filename}" ) # 读取文件内容 content = file_obj.read() # 验证文件大小 if not self._size_validator.validate(len(content)): return UploadResult( success=False, message=f"文件大小超过限制" ) # 保存文件 file_id = self._storage.save(content, safe_filename) file_path = os.path.join(self._upload_dir, file_id) return UploadResult( success=True, file_id=file_id, file_path=file_path, filename=safe_filename, size=len(content) ) def upload_batch(self, file_paths: List[str]) -> List[UploadResult]: """ 批量上传文件 Args: file_paths: 文件路径列表 Returns: 上传结果列表 """ results = [] for file_path in file_paths: try: with open(file_path, 'rb') as f: filename = os.path.basename(file_path) result = self.upload(f, filename) results.append(result) except Exception as e: results.append(UploadResult( success=False, message=str(e) )) return results class FileDownloader: """文件下载器""" def __init__(self, storage_manager: Optional[FileStorageManager] = None): """ 初始化文件下载器 Args: storage_manager: 存储管理器 """ self._storage = storage_manager def download(self, file_id: str) -> DownloadResult: """ 下载文件 Args: file_id: 文件ID Returns: 下载结果 """ if self._storage is None: return DownloadResult( success=False, message="存储管理器未设置" ) content = self._storage.get(file_id) if content is None: return DownloadResult( success=False, message="文件不存在" ) return DownloadResult( success=True, content=content )