""" 数据库连接池管理模块测试 - TDD Red阶段 测试数据库连接池的创建、管理和监控功能。 """ import pytest import allure import threading import time from typing import Any, Optional @allure.epic("核心框架") @allure.feature("数据库连接池管理 - TDD Red阶段") class TestConnectionPool: """数据库连接池管理测试类 - TDD Red阶段(期望失败)""" @allure.title("测试连接池基本操作 - TDD Red阶段") @allure.description("验证连接池的基本CRUD操作 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_pool_basic_operations(self) -> None: """ TDD Red阶段: 测试连接池基本操作 预期结果: - 可以创建连接池 - 可以获取连接 - 可以释放连接 - 可以关闭连接池 """ from core.connection_pool import ConnectionPool with allure.step("Step 1: 创建连接池"): pool = ConnectionPool( min_connections=2, max_connections=5, host="localhost", port=3306, database="test_db", user="test_user", password="test_pass" ) allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 获取连接"): conn = pool.get_connection() assert conn is not None, "应该能获取到连接" allure.attach("✅ 获取连接成功", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 释放连接"): pool.release_connection(conn) allure.attach("✅ 释放连接成功", "步骤3", allure.attachment_type.TEXT) with allure.step("Step 4: 关闭连接池"): pool.close() allure.attach("✅ 连接池关闭成功", "步骤4", allure.attachment_type.TEXT) @allure.title("测试连接池容量限制 - TDD Red阶段") @allure.description("验证连接池的容量限制和等待机制 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_pool_capacity_limit(self) -> None: """ TDD Red阶段: 测试连接池容量限制 预期结果: - 连接数不超过最大值 - 超出时等待或报错 - 释放后可以继续获取 """ from core.connection_pool import ConnectionPool with allure.step("Step 1: 创建容量为3的连接池"): pool = ConnectionPool( min_connections=1, max_connections=3, host="localhost", port=3306, database="test_db", user="test_user", password="test_pass" ) allure.attach("✅ 创建容量为3的连接池", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 获取3个连接"): conn1 = pool.get_connection() conn2 = pool.get_connection() conn3 = pool.get_connection() allure.attach("✅ 获取3个连接", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 验证第4个连接需要等待"): start_time = time.time() try: conn4 = pool.get_connection(timeout=1) elapsed = time.time() - start_time allure.attach(f"⚠️ 第4个连接获取成功,耗时: {elapsed:.2f}s", "步骤3", allure.attachment_type.TEXT) except Exception as e: elapsed = time.time() - start_time allure.attach(f"✅ 第4个连接获取超时,耗时: {elapsed:.2f}s", "步骤3", allure.attachment_type.TEXT) with allure.step("Step 4: 释放连接后继续获取"): pool.release_connection(conn1) conn4 = pool.get_connection() assert conn4 is not None, "释放后应该能获取到连接" allure.attach("✅ 释放后成功获取连接", "步骤4", allure.attachment_type.TEXT) with allure.step("Step 5: 清理"): pool.release_connection(conn2) pool.release_connection(conn3) pool.release_connection(conn4) pool.close() @allure.title("测试连接池统计信息 - TDD Red阶段") @allure.description("验证连接池统计信息正确收集 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_pool_statistics(self) -> None: """ TDD Red阶段: 测试连接池统计信息 预期结果: - 总连接数统计正确 - 空闲连接数统计正确 - 活跃连接数统计正确 - 等待次数统计正确 """ from core.connection_pool import ConnectionPool with allure.step("Step 1: 创建连接池"): pool = ConnectionPool( min_connections=2, max_connections=5, host="localhost", port=3306, database="test_db", user="test_user", password="test_pass" ) allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 获取统计信息"): stats = pool.get_stats() allure.attach(f"初始统计: {stats}", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 验证统计信息"): assert "total_connections" in stats, "统计信息应包含total_connections" assert "idle_connections" in stats, "统计信息应包含idle_connections" assert "active_connections" in stats, "统计信息应包含active_connections" assert stats["total_connections"] >= 2, "总连接数应至少为min_connections" with allure.step("Step 4: 获取连接后验证统计变化"): conn = pool.get_connection() stats_after_get = pool.get_stats() allure.attach(f"获取连接后统计: {stats_after_get}", "步骤4", allure.attachment_type.TEXT) pool.release_connection(conn) pool.close() @allure.title("测试连接池健康检查 - TDD Red阶段") @allure.description("验证连接池健康检查功能 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_pool_health_check(self) -> None: """ TDD Red阶段: 测试连接池健康检查 预期结果: - 可以检查连接健康状态 - 不健康连接被自动替换 - 健康检查定期执行 """ from core.connection_pool import ConnectionPool with allure.step("Step 1: 创建带健康检查的连接池"): pool = ConnectionPool( min_connections=2, max_connections=5, host="localhost", port=3306, database="test_db", user="test_user", password="test_pass", health_check_interval=1 ) allure.attach("✅ 创建带健康检查的连接池", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 执行健康检查"): is_healthy = pool.health_check() allure.attach(f"健康状态: {is_healthy}", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 获取健康统计"): health_stats = pool.get_health_stats() allure.attach(f"健康统计: {health_stats}", "步骤3", allure.attachment_type.TEXT) assert "healthy_connections" in health_stats, "健康统计应包含healthy_connections" assert "unhealthy_connections" in health_stats, "健康统计应包含unhealthy_connections" with allure.step("Step 4: 清理"): pool.close() @allure.title("测试连接池线程安全 - TDD Red阶段") @allure.description("验证连接池在多线程环境下的安全性 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_pool_thread_safety(self) -> None: """ TDD Red阶段: 测试连接池线程安全 预期结果: - 多线程并发获取连接不会出错 - 连接分配正确 - 不会出现竞态条件 """ from core.connection_pool import ConnectionPool with allure.step("Step 1: 创建连接池"): pool = ConnectionPool( min_connections=2, max_connections=10, host="localhost", port=3306, database="test_db", user="test_user", password="test_pass" ) allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 多线程并发获取连接"): errors = [] connections = [] lock = threading.Lock() def get_connection_task(thread_id: int): try: conn = pool.get_connection(timeout=5) with lock: connections.append(conn) time.sleep(0.1) # 模拟使用 pool.release_connection(conn) except Exception as e: with lock: errors.append(str(e)) threads = [] for i in range(10): t = threading.Thread(target=get_connection_task, args=(i,)) threads.append(t) t.start() for t in threads: t.join() allure.attach(f"线程数: 10, 错误数: {len(errors)}", "步骤2", allure.attachment_type.TEXT) assert len(errors) == 0, f"并发获取连接出现错误: {errors}" with allure.step("Step 3: 清理"): pool.close() @allure.title("测试连接池管理器 - TDD Red阶段") @allure.description("验证连接池管理器的单例模式和多池管理 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_pool_manager(self) -> None: """ TDD Red阶段: 测试连接池管理器 预期结果: - 单例模式正确 - 可以管理多个连接池 - 可以获取指定连接池 """ from core.connection_pool import ConnectionPoolManager with allure.step("Step 1: 获取连接池管理器实例"): manager1 = ConnectionPoolManager() manager2 = ConnectionPoolManager() assert manager1 is manager2, "应该是单例模式" allure.attach("✅ 单例模式验证通过", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 创建多个命名连接池"): user_pool = manager1.create_pool( "user_db", min_connections=2, max_connections=5, host="localhost", port=3306, database="user_db", user="user", password="pass" ) order_pool = manager1.create_pool( "order_db", min_connections=2, max_connections=5, host="localhost", port=3306, database="order_db", user="order", password="pass" ) allure.attach("✅ 创建2个命名连接池", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 获取指定连接池"): retrieved_user_pool = manager1.get_pool("user_db") retrieved_order_pool = manager1.get_pool("order_db") assert retrieved_user_pool is user_pool, "应该获取到相同的user_db连接池" assert retrieved_order_pool is order_pool, "应该获取到相同的order_db连接池" allure.attach("✅ 获取指定连接池成功", "步骤3", allure.attachment_type.TEXT) with allure.step("Step 4: 获取所有连接池统计"): all_stats = manager1.get_all_stats() allure.attach(f"所有连接池统计: {all_stats}", "步骤4", allure.attachment_type.TEXT) assert "user_db" in all_stats, "统计应包含user_db" assert "order_db" in all_stats, "统计应包含order_db" with allure.step("Step 5: 清理"): manager1.close_all() @allure.title("测试连接池自动扩容 - TDD Red阶段") @allure.description("验证连接池根据负载自动扩容 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_pool_auto_scaling(self) -> None: """ TDD Red阶段: 测试连接池自动扩容 预期结果: - 负载增加时自动扩容 - 负载降低时自动缩容 - 扩容不超过最大值 """ from core.connection_pool import ConnectionPool with allure.step("Step 1: 创建可自动扩容的连接池"): pool = ConnectionPool( min_connections=1, max_connections=5, host="localhost", port=3306, database="test_db", user="test_user", password="test_pass", auto_scale=True ) allure.attach("✅ 创建可自动扩容的连接池", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 获取初始统计"): initial_stats = pool.get_stats() initial_total = initial_stats["total_connections"] allure.attach(f"初始连接数: {initial_total}", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 模拟高负载"): connections = [] for i in range(4): conn = pool.get_connection() connections.append(conn) high_load_stats = pool.get_stats() allure.attach(f"高负载连接数: {high_load_stats['total_connections']}", "步骤3", allure.attachment_type.TEXT) with allure.step("Step 4: 释放连接"): for conn in connections: pool.release_connection(conn) time.sleep(0.5) # 等待缩容 low_load_stats = pool.get_stats() allure.attach(f"低负载连接数: {low_load_stats['total_connections']}", "步骤4", allure.attachment_type.TEXT) with allure.step("Step 5: 清理"): pool.close() @allure.title("测试连接池连接验证 - TDD Red阶段") @allure.description("验证连接池获取的连接是有效的 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_connection_validation(self) -> None: """ TDD Red阶段: 测试连接验证 预期结果: - 获取的连接是有效的 - 无效连接被自动替换 - 可以执行简单查询验证 """ from core.connection_pool import ConnectionPool with allure.step("Step 1: 创建连接池"): pool = ConnectionPool( min_connections=2, max_connections=5, host="localhost", port=3306, database="test_db", user="test_user", password="test_pass" ) allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 获取连接并验证"): conn = pool.get_connection() is_valid = conn.is_valid() allure.attach(f"连接有效: {is_valid}", "步骤2", allure.attachment_type.TEXT) assert is_valid, "获取的连接应该是有效的" with allure.step("Step 3: 执行简单查询"): try: result = conn.execute("SELECT 1") allure.attach(f"查询结果: {result}", "步骤3", allure.attachment_type.TEXT) except Exception as e: allure.attach(f"查询异常: {str(e)}", "步骤3", allure.attachment_type.TEXT) with allure.step("Step 4: 清理"): pool.release_connection(conn) pool.close()