Files
张翔 08ea5fbe98 feat(admin): 添加用户管理相关文件
添加用户管理视图、API和状态管理文件
2026-03-28 14:37:29 +08:00

453 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库连接池管理模块
提供数据库连接池的创建、管理和监控功能。
"""
import time
import threading
import uuid
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import queue
class ConnectionStatus(Enum):
"""连接状态"""
IDLE = "idle"
ACTIVE = "active"
CLOSED = "closed"
UNHEALTHY = "unhealthy"
@dataclass
class Connection:
"""数据库连接封装"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
status: ConnectionStatus = ConnectionStatus.IDLE
created_at: float = field(default_factory=time.time)
last_used: float = field(default_factory=time.time)
use_count: int = 0
host: str = ""
port: int = 3306
database: str = ""
user: str = ""
password: str = ""
def is_valid(self) -> bool:
"""检查连接是否有效"""
return self.status in [ConnectionStatus.IDLE, ConnectionStatus.ACTIVE]
def execute(self, query: str) -> Any:
"""执行查询(模拟)"""
if not self.is_valid():
raise Exception("连接无效")
return f"Result of: {query}"
def close(self) -> None:
"""关闭连接"""
self.status = ConnectionStatus.CLOSED
class ConnectionPool:
"""
数据库连接池
特性:
- 支持最小/最大连接数配置
- 支持连接超时等待
- 支持健康检查
- 支持自动扩容
- 线程安全
"""
def __init__(
self,
min_connections: int = 2,
max_connections: int = 10,
host: str = "localhost",
port: int = 3306,
database: str = "",
user: str = "",
password: str = "",
connection_timeout: int = 30,
health_check_interval: int = 60,
auto_scale: bool = False
):
"""
初始化连接池
Args:
min_connections: 最小连接数
max_connections: 最大连接数
host: 数据库主机
port: 数据库端口
database: 数据库名
user: 用户名
password: 密码
connection_timeout: 连接超时时间(秒)
health_check_interval: 健康检查间隔(秒)
auto_scale: 是否自动扩容
"""
self._min_connections = min_connections
self._max_connections = max_connections
self._host = host
self._port = port
self._database = database
self._user = user
self._password = password
self._connection_timeout = connection_timeout
self._health_check_interval = health_check_interval
self._auto_scale = auto_scale
# 连接池
self._idle_connections: queue.Queue[Connection] = queue.Queue()
self._active_connections: Dict[str, Connection] = {}
self._all_connections: Dict[str, Connection] = {}
# 锁
self._lock = threading.RLock()
self._condition = threading.Condition(self._lock)
# 统计信息
self._stats = {
"total_get_count": 0,
"total_release_count": 0,
"total_wait_count": 0,
"total_wait_time": 0.0,
"health_check_count": 0,
"unhealthy_count": 0,
}
# 健康检查线程
self._health_check_thread: Optional[threading.Thread] = None
self._shutdown = False
# 初始化最小连接数
self._initialize_min_connections()
# 启动健康检查
if health_check_interval > 0:
self._start_health_check()
def _initialize_min_connections(self) -> None:
"""初始化最小连接数"""
for _ in range(self._min_connections):
conn = self._create_connection()
self._idle_connections.put(conn)
self._all_connections[conn.id] = conn
def _create_connection(self) -> Connection:
"""创建新连接"""
conn = Connection(
host=self._host,
port=self._port,
database=self._database,
user=self._user,
password=self._password
)
return conn
def get_connection(self, timeout: Optional[int] = None) -> Connection:
"""
获取连接
Args:
timeout: 超时时间(秒)None表示使用默认值
Returns:
数据库连接
Raises:
Exception: 超时或连接池已关闭
"""
if timeout is None:
timeout = self._connection_timeout
with self._condition:
self._stats["total_get_count"] += 1
# 尝试获取空闲连接
while not self._shutdown:
# 如果有空闲连接,直接返回
try:
conn = self._idle_connections.get_nowait()
if conn.is_valid():
conn.status = ConnectionStatus.ACTIVE
conn.last_used = time.time()
conn.use_count += 1
self._active_connections[conn.id] = conn
return conn
else:
# 连接无效,移除
self._remove_connection(conn)
except queue.Empty:
pass
# 如果没有空闲连接,尝试创建新连接
if len(self._all_connections) < self._max_connections:
conn = self._create_connection()
conn.status = ConnectionStatus.ACTIVE
conn.last_used = time.time()
conn.use_count += 1
self._active_connections[conn.id] = conn
self._all_connections[conn.id] = conn
return conn
# 如果达到最大连接数,等待
self._stats["total_wait_count"] += 1
start_wait = time.time()
if not self._condition.wait(timeout=timeout):
raise Exception(f"获取连接超时({timeout}秒)")
self._stats["total_wait_time"] += time.time() - start_wait
raise Exception("连接池已关闭")
def release_connection(self, conn: Connection) -> None:
"""
释放连接
Args:
conn: 要释放的连接
"""
with self._condition:
if conn.id in self._active_connections:
del self._active_connections[conn.id]
if conn.is_valid():
conn.status = ConnectionStatus.IDLE
conn.last_used = time.time()
self._idle_connections.put(conn)
self._stats["total_release_count"] += 1
else:
# 连接无效,移除
self._remove_connection(conn)
# 通知等待的线程
self._condition.notify()
def _remove_connection(self, conn: Connection) -> None:
"""移除连接"""
conn.close()
if conn.id in self._all_connections:
del self._all_connections[conn.id]
if conn.id in self._active_connections:
del self._active_connections[conn.id]
def close(self) -> None:
"""关闭连接池"""
self._shutdown = True
with self._condition:
# 关闭所有连接
for conn in self._all_connections.values():
conn.close()
self._all_connections.clear()
self._active_connections.clear()
# 清空空闲队列
while not self._idle_connections.empty():
try:
self._idle_connections.get_nowait()
except queue.Empty:
break
# 通知所有等待的线程
self._condition.notify_all()
def get_stats(self) -> Dict[str, Any]:
"""
获取统计信息
Returns:
统计信息字典
"""
with self._lock:
return {
"total_connections": len(self._all_connections),
"idle_connections": self._idle_connections.qsize(),
"active_connections": len(self._active_connections),
"min_connections": self._min_connections,
"max_connections": self._max_connections,
"total_get_count": self._stats["total_get_count"],
"total_release_count": self._stats["total_release_count"],
"total_wait_count": self._stats["total_wait_count"],
"total_wait_time": self._stats["total_wait_time"],
"health_check_count": self._stats["health_check_count"],
"unhealthy_count": self._stats["unhealthy_count"],
}
def health_check(self) -> bool:
"""
执行健康检查
Returns:
是否所有连接都健康
"""
with self._lock:
self._stats["health_check_count"] += 1
unhealthy_count = 0
for conn in list(self._all_connections.values()):
if not conn.is_valid():
unhealthy_count += 1
self._remove_connection(conn)
self._stats["unhealthy_count"] += unhealthy_count
# 补充最小连接数
while len(self._all_connections) < self._min_connections:
conn = self._create_connection()
self._idle_connections.put(conn)
self._all_connections[conn.id] = conn
return unhealthy_count == 0
def get_health_stats(self) -> Dict[str, Any]:
"""
获取健康统计
Returns:
健康统计字典
"""
with self._lock:
healthy = sum(1 for conn in self._all_connections.values() if conn.is_valid())
unhealthy = len(self._all_connections) - healthy
return {
"healthy_connections": healthy,
"unhealthy_connections": unhealthy,
"health_check_count": self._stats["health_check_count"],
"total_unhealthy_count": self._stats["unhealthy_count"],
}
def _start_health_check(self) -> None:
"""启动健康检查线程"""
def health_check_worker():
while not self._shutdown:
time.sleep(self._health_check_interval)
if not self._shutdown:
self.health_check()
self._health_check_thread = threading.Thread(
target=health_check_worker,
daemon=True
)
self._health_check_thread.start()
class ConnectionPoolManager:
"""
连接池管理器
单例模式管理多个命名连接池
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._pools = {}
return cls._instance
def create_pool(
self,
name: str,
min_connections: int = 2,
max_connections: int = 10,
host: str = "localhost",
port: int = 3306,
database: str = "",
user: str = "",
password: str = "",
**kwargs
) -> ConnectionPool:
"""
创建命名连接池
Args:
name: 连接池名称
min_connections: 最小连接数
max_connections: 最大连接数
host: 数据库主机
port: 数据库端口
database: 数据库名
user: 用户名
password: 密码
**kwargs: 其他参数
Returns:
连接池实例
"""
if name in self._pools:
return self._pools[name]
pool = ConnectionPool(
min_connections=min_connections,
max_connections=max_connections,
host=host,
port=port,
database=database,
user=user,
password=password,
**kwargs
)
self._pools[name] = pool
return pool
def get_pool(self, name: str) -> Optional[ConnectionPool]:
"""
获取命名连接池
Args:
name: 连接池名称
Returns:
连接池实例,不存在则返回None
"""
return self._pools.get(name)
def remove_pool(self, name: str) -> bool:
"""
移除连接池
Args:
name: 连接池名称
Returns:
是否成功移除
"""
if name in self._pools:
self._pools[name].close()
del self._pools[name]
return True
return False
def close_all(self) -> None:
"""关闭所有连接池"""
for pool in self._pools.values():
pool.close()
self._pools.clear()
def get_all_stats(self) -> Dict[str, Dict[str, Any]]:
"""
获取所有连接池统计
Returns:
连接池名称到统计信息的映射
"""
return {name: pool.get_stats() for name, pool in self._pools.items()}
# 全局连接池管理器实例
pool_manager = ConnectionPoolManager()