Files
张翔 08ea5fbe98 feat(admin): 添加用户管理相关文件
添加用户管理视图、API和状态管理文件
2026-03-28 14:37:29 +08:00

397 lines
10 KiB
Python

"""
Caffeine缓存管理模块
基于Caffeine的本地缓存管理实现。
"""
import time
import threading
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from collections import OrderedDict
@dataclass
class CacheEntry:
"""缓存条目"""
value: Any
expires_at: Optional[float] = None
created_at: float = field(default_factory=time.time)
access_count: int = field(default=0)
last_accessed: float = field(default_factory=time.time)
class CaffeineCache:
"""
Caffeine风格的本地缓存实现
特性:
- 支持TTL过期时间
- 支持最大容量限制(LRU淘汰)
- 支持统计信息
- 线程安全
- 批量操作支持
"""
def __init__(
self,
max_size: int = 1000,
default_expire_seconds: Optional[int] = None,
record_stats: bool = False
):
"""
初始化缓存
Args:
max_size: 最大缓存条目数
default_expire_seconds: 默认过期时间(秒)
record_stats: 是否记录统计信息
"""
self._max_size = max_size
self._default_expire_seconds = default_expire_seconds
self._record_stats = record_stats
self._data: Dict[str, CacheEntry] = {}
self._lock = threading.RLock()
# 统计信息
self._stats = {
"hit_count": 0,
"miss_count": 0,
"put_count": 0,
"delete_count": 0,
"eviction_count": 0,
}
def put(self, key: str, value: Any, expire_seconds: Optional[int] = None) -> None:
"""
添加缓存条目
Args:
key: 缓存键
value: 缓存值
expire_seconds: 过期时间(秒),None表示使用默认值,0表示永不过期
"""
with self._lock:
# 计算过期时间
if expire_seconds is not None:
expires_at = time.time() + expire_seconds if expire_seconds > 0 else None
elif self._default_expire_seconds is not None:
expires_at = time.time() + self._default_expire_seconds
else:
expires_at = None
# 创建缓存条目
entry = CacheEntry(
value=value,
expires_at=expires_at
)
# 检查是否需要淘汰
if key not in self._data and len(self._data) >= self._max_size:
self._evict_oldest()
# 存储数据
self._data[key] = entry
if self._record_stats:
self._stats["put_count"] += 1
def get(self, key: str) -> Optional[Any]:
"""
获取缓存值
Args:
key: 缓存键
Returns:
缓存值,不存在或已过期则返回None
"""
with self._lock:
entry = self._data.get(key)
if entry is None:
if self._record_stats:
self._stats["miss_count"] += 1
return None
# 检查是否过期
if entry.expires_at is not None and time.time() > entry.expires_at:
del self._data[key]
if self._record_stats:
self._stats["miss_count"] += 1
self._stats["eviction_count"] += 1
return None
# 更新访问信息
entry.access_count += 1
entry.last_accessed = time.time()
if self._record_stats:
self._stats["hit_count"] += 1
return entry.value
def exists(self, key: str) -> bool:
"""
检查键是否存在
Args:
key: 缓存键
Returns:
是否存在且未过期
"""
with self._lock:
entry = self._data.get(key)
if entry is None:
return False
# 检查是否过期
if entry.expires_at is not None and time.time() > entry.expires_at:
del self._data[key]
return False
return True
def delete(self, key: str) -> bool:
"""
删除缓存条目
Args:
key: 缓存键
Returns:
是否成功删除
"""
with self._lock:
if key in self._data:
del self._data[key]
if self._record_stats:
self._stats["delete_count"] += 1
return True
return False
def get_all(self, keys: List[str]) -> Dict[str, Optional[Any]]:
"""
批量获取缓存值
Args:
keys: 缓存键列表
Returns:
键值对字典
"""
result = {}
for key in keys:
result[key] = self.get(key)
return result
def put_all(self, data: Dict[str, Any], expire_seconds: Optional[int] = None) -> None:
"""
批量添加缓存条目
Args:
data: 键值对字典
expire_seconds: 过期时间(秒)
"""
for key, value in data.items():
self.put(key, value, expire_seconds)
def delete_all(self, keys: List[str]) -> int:
"""
批量删除缓存条目
Args:
keys: 缓存键列表
Returns:
成功删除的数量
"""
count = 0
for key in keys:
if self.delete(key):
count += 1
return count
def clear(self) -> None:
"""清空所有缓存"""
with self._lock:
self._data.clear()
def get_stats(self) -> Dict[str, Any]:
"""
获取缓存统计信息
Returns:
统计信息字典
"""
with self._lock:
stats = self._stats.copy()
stats["size"] = len(self._data)
stats["max_size"] = self._max_size
# 计算命中率
total_requests = stats["hit_count"] + stats["miss_count"]
if total_requests > 0:
stats["hit_rate"] = stats["hit_count"] / total_requests
else:
stats["hit_rate"] = 0.0
return stats
def _evict_oldest(self) -> None:
"""淘汰最久未使用的条目"""
if not self._data:
return
# 找到最久未访问的条目
oldest_key = min(
self._data.keys(),
key=lambda k: self._data[k].last_accessed
)
del self._data[oldest_key]
if self._record_stats:
self._stats["eviction_count"] += 1
def size(self) -> int:
"""
获取当前缓存大小
Returns:
缓存条目数
"""
with self._lock:
return len(self._data)
def keys(self) -> List[str]:
"""
获取所有缓存键
Returns:
缓存键列表
"""
with self._lock:
return list(self._data.keys())
def values(self) -> List[Any]:
"""
获取所有缓存值
Returns:
缓存值列表
"""
with self._lock:
return [entry.value for entry in self._data.values()]
def items(self) -> Dict[str, Any]:
"""
获取所有缓存项
Returns:
键值对字典
"""
with self._lock:
return {k: v.value for k, v in self._data.items()}
def cleanup_expired(self) -> int:
"""
清理过期条目
Returns:
清理的条目数
"""
with self._lock:
current_time = time.time()
expired_keys = [
key for key, entry in self._data.items()
if entry.expires_at is not None and current_time > entry.expires_at
]
for key in expired_keys:
del self._data[key]
if self._record_stats:
self._stats["eviction_count"] += 1
return len(expired_keys)
class CaffeineCacheManager:
"""
Caffeine缓存管理器
管理多个命名缓存实例
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._caches = {}
return cls._instance
def get_cache(
self,
name: str,
max_size: int = 1000,
default_expire_seconds: Optional[int] = None,
record_stats: bool = False
) -> CaffeineCache:
"""
获取或创建命名缓存
Args:
name: 缓存名称
max_size: 最大缓存条目数
default_expire_seconds: 默认过期时间
record_stats: 是否记录统计信息
Returns:
缓存实例
"""
if name not in self._caches:
self._caches[name] = CaffeineCache(
max_size=max_size,
default_expire_seconds=default_expire_seconds,
record_stats=record_stats
)
return self._caches[name]
def remove_cache(self, name: str) -> bool:
"""
移除缓存
Args:
name: 缓存名称
Returns:
是否成功移除
"""
if name in self._caches:
del self._caches[name]
return True
return False
def clear_all(self) -> None:
"""清空所有缓存"""
for cache in self._caches.values():
cache.clear()
def get_all_stats(self) -> Dict[str, Dict[str, Any]]:
"""
获取所有缓存的统计信息
Returns:
缓存名称到统计信息的映射
"""
return {name: cache.get_stats() for name, cache in self._caches.items()}
# 全局缓存管理器实例
cache_manager = CaffeineCacheManager()