""" 数据库连接池管理模块 提供数据库连接池的创建、管理和监控功能。 """ import time import threading import uuid from typing import Any, Dict, List, Optional, Callable from dataclasses import dataclass, field from enum import Enum import queue class ConnectionStatus(Enum): """连接状态""" IDLE = "idle" ACTIVE = "active" CLOSED = "closed" UNHEALTHY = "unhealthy" @dataclass class Connection: """数据库连接封装""" id: str = field(default_factory=lambda: str(uuid.uuid4())) status: ConnectionStatus = ConnectionStatus.IDLE created_at: float = field(default_factory=time.time) last_used: float = field(default_factory=time.time) use_count: int = 0 host: str = "" port: int = 3306 database: str = "" user: str = "" password: str = "" def is_valid(self) -> bool: """检查连接是否有效""" return self.status in [ConnectionStatus.IDLE, ConnectionStatus.ACTIVE] def execute(self, query: str) -> Any: """执行查询(模拟)""" if not self.is_valid(): raise Exception("连接无效") return f"Result of: {query}" def close(self) -> None: """关闭连接""" self.status = ConnectionStatus.CLOSED class ConnectionPool: """ 数据库连接池 特性: - 支持最小/最大连接数配置 - 支持连接超时等待 - 支持健康检查 - 支持自动扩容 - 线程安全 """ def __init__( self, min_connections: int = 2, max_connections: int = 10, host: str = "localhost", port: int = 3306, database: str = "", user: str = "", password: str = "", connection_timeout: int = 30, health_check_interval: int = 60, auto_scale: bool = False ): """ 初始化连接池 Args: min_connections: 最小连接数 max_connections: 最大连接数 host: 数据库主机 port: 数据库端口 database: 数据库名 user: 用户名 password: 密码 connection_timeout: 连接超时时间(秒) health_check_interval: 健康检查间隔(秒) auto_scale: 是否自动扩容 """ self._min_connections = min_connections self._max_connections = max_connections self._host = host self._port = port self._database = database self._user = user self._password = password self._connection_timeout = connection_timeout self._health_check_interval = health_check_interval self._auto_scale = auto_scale # 连接池 self._idle_connections: queue.Queue[Connection] = queue.Queue() self._active_connections: Dict[str, Connection] = {} self._all_connections: Dict[str, Connection] = {} # 锁 self._lock = threading.RLock() self._condition = threading.Condition(self._lock) # 统计信息 self._stats = { "total_get_count": 0, "total_release_count": 0, "total_wait_count": 0, "total_wait_time": 0.0, "health_check_count": 0, "unhealthy_count": 0, } # 健康检查线程 self._health_check_thread: Optional[threading.Thread] = None self._shutdown = False # 初始化最小连接数 self._initialize_min_connections() # 启动健康检查 if health_check_interval > 0: self._start_health_check() def _initialize_min_connections(self) -> None: """初始化最小连接数""" for _ in range(self._min_connections): conn = self._create_connection() self._idle_connections.put(conn) self._all_connections[conn.id] = conn def _create_connection(self) -> Connection: """创建新连接""" conn = Connection( host=self._host, port=self._port, database=self._database, user=self._user, password=self._password ) return conn def get_connection(self, timeout: Optional[int] = None) -> Connection: """ 获取连接 Args: timeout: 超时时间(秒),None表示使用默认值 Returns: 数据库连接 Raises: Exception: 超时或连接池已关闭 """ if timeout is None: timeout = self._connection_timeout with self._condition: self._stats["total_get_count"] += 1 # 尝试获取空闲连接 while not self._shutdown: # 如果有空闲连接,直接返回 try: conn = self._idle_connections.get_nowait() if conn.is_valid(): conn.status = ConnectionStatus.ACTIVE conn.last_used = time.time() conn.use_count += 1 self._active_connections[conn.id] = conn return conn else: # 连接无效,移除 self._remove_connection(conn) except queue.Empty: pass # 如果没有空闲连接,尝试创建新连接 if len(self._all_connections) < self._max_connections: conn = self._create_connection() conn.status = ConnectionStatus.ACTIVE conn.last_used = time.time() conn.use_count += 1 self._active_connections[conn.id] = conn self._all_connections[conn.id] = conn return conn # 如果达到最大连接数,等待 self._stats["total_wait_count"] += 1 start_wait = time.time() if not self._condition.wait(timeout=timeout): raise Exception(f"获取连接超时({timeout}秒)") self._stats["total_wait_time"] += time.time() - start_wait raise Exception("连接池已关闭") def release_connection(self, conn: Connection) -> None: """ 释放连接 Args: conn: 要释放的连接 """ with self._condition: if conn.id in self._active_connections: del self._active_connections[conn.id] if conn.is_valid(): conn.status = ConnectionStatus.IDLE conn.last_used = time.time() self._idle_connections.put(conn) self._stats["total_release_count"] += 1 else: # 连接无效,移除 self._remove_connection(conn) # 通知等待的线程 self._condition.notify() def _remove_connection(self, conn: Connection) -> None: """移除连接""" conn.close() if conn.id in self._all_connections: del self._all_connections[conn.id] if conn.id in self._active_connections: del self._active_connections[conn.id] def close(self) -> None: """关闭连接池""" self._shutdown = True with self._condition: # 关闭所有连接 for conn in self._all_connections.values(): conn.close() self._all_connections.clear() self._active_connections.clear() # 清空空闲队列 while not self._idle_connections.empty(): try: self._idle_connections.get_nowait() except queue.Empty: break # 通知所有等待的线程 self._condition.notify_all() def get_stats(self) -> Dict[str, Any]: """ 获取统计信息 Returns: 统计信息字典 """ with self._lock: return { "total_connections": len(self._all_connections), "idle_connections": self._idle_connections.qsize(), "active_connections": len(self._active_connections), "min_connections": self._min_connections, "max_connections": self._max_connections, "total_get_count": self._stats["total_get_count"], "total_release_count": self._stats["total_release_count"], "total_wait_count": self._stats["total_wait_count"], "total_wait_time": self._stats["total_wait_time"], "health_check_count": self._stats["health_check_count"], "unhealthy_count": self._stats["unhealthy_count"], } def health_check(self) -> bool: """ 执行健康检查 Returns: 是否所有连接都健康 """ with self._lock: self._stats["health_check_count"] += 1 unhealthy_count = 0 for conn in list(self._all_connections.values()): if not conn.is_valid(): unhealthy_count += 1 self._remove_connection(conn) self._stats["unhealthy_count"] += unhealthy_count # 补充最小连接数 while len(self._all_connections) < self._min_connections: conn = self._create_connection() self._idle_connections.put(conn) self._all_connections[conn.id] = conn return unhealthy_count == 0 def get_health_stats(self) -> Dict[str, Any]: """ 获取健康统计 Returns: 健康统计字典 """ with self._lock: healthy = sum(1 for conn in self._all_connections.values() if conn.is_valid()) unhealthy = len(self._all_connections) - healthy return { "healthy_connections": healthy, "unhealthy_connections": unhealthy, "health_check_count": self._stats["health_check_count"], "total_unhealthy_count": self._stats["unhealthy_count"], } def _start_health_check(self) -> None: """启动健康检查线程""" def health_check_worker(): while not self._shutdown: time.sleep(self._health_check_interval) if not self._shutdown: self.health_check() self._health_check_thread = threading.Thread( target=health_check_worker, daemon=True ) self._health_check_thread.start() class ConnectionPoolManager: """ 连接池管理器 单例模式管理多个命名连接池 """ _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._pools = {} return cls._instance def create_pool( self, name: str, min_connections: int = 2, max_connections: int = 10, host: str = "localhost", port: int = 3306, database: str = "", user: str = "", password: str = "", **kwargs ) -> ConnectionPool: """ 创建命名连接池 Args: name: 连接池名称 min_connections: 最小连接数 max_connections: 最大连接数 host: 数据库主机 port: 数据库端口 database: 数据库名 user: 用户名 password: 密码 **kwargs: 其他参数 Returns: 连接池实例 """ if name in self._pools: return self._pools[name] pool = ConnectionPool( min_connections=min_connections, max_connections=max_connections, host=host, port=port, database=database, user=user, password=password, **kwargs ) self._pools[name] = pool return pool def get_pool(self, name: str) -> Optional[ConnectionPool]: """ 获取命名连接池 Args: name: 连接池名称 Returns: 连接池实例,不存在则返回None """ return self._pools.get(name) def remove_pool(self, name: str) -> bool: """ 移除连接池 Args: name: 连接池名称 Returns: 是否成功移除 """ if name in self._pools: self._pools[name].close() del self._pools[name] return True return False def close_all(self) -> None: """关闭所有连接池""" for pool in self._pools.values(): pool.close() self._pools.clear() def get_all_stats(self) -> Dict[str, Dict[str, Any]]: """ 获取所有连接池统计 Returns: 连接池名称到统计信息的映射 """ return {name: pool.get_stats() for name, pool in self._pools.items()} # 全局连接池管理器实例 pool_manager = ConnectionPoolManager()