08ea5fbe98
添加用户管理视图、API和状态管理文件
416 lines
16 KiB
Python
416 lines
16 KiB
Python
"""
|
|
数据库连接池管理模块测试 - TDD Red阶段
|
|
|
|
测试数据库连接池的创建、管理和监控功能。
|
|
"""
|
|
|
|
import pytest
|
|
import allure
|
|
import threading
|
|
import time
|
|
from typing import Any, Optional
|
|
|
|
|
|
@allure.epic("核心框架")
|
|
@allure.feature("数据库连接池管理 - TDD Red阶段")
|
|
class TestConnectionPool:
|
|
"""数据库连接池管理测试类 - TDD Red阶段(期望失败)"""
|
|
|
|
@allure.title("测试连接池基本操作 - TDD Red阶段")
|
|
@allure.description("验证连接池的基本CRUD操作 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.CRITICAL)
|
|
@pytest.mark.smoke
|
|
def test_pool_basic_operations(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接池基本操作
|
|
|
|
预期结果:
|
|
- 可以创建连接池
|
|
- 可以获取连接
|
|
- 可以释放连接
|
|
- 可以关闭连接池
|
|
"""
|
|
from core.connection_pool import ConnectionPool
|
|
|
|
with allure.step("Step 1: 创建连接池"):
|
|
pool = ConnectionPool(
|
|
min_connections=2,
|
|
max_connections=5,
|
|
host="localhost",
|
|
port=3306,
|
|
database="test_db",
|
|
user="test_user",
|
|
password="test_pass"
|
|
)
|
|
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 获取连接"):
|
|
conn = pool.get_connection()
|
|
assert conn is not None, "应该能获取到连接"
|
|
allure.attach("✅ 获取连接成功", "步骤2", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 3: 释放连接"):
|
|
pool.release_connection(conn)
|
|
allure.attach("✅ 释放连接成功", "步骤3", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 4: 关闭连接池"):
|
|
pool.close()
|
|
allure.attach("✅ 连接池关闭成功", "步骤4", allure.attachment_type.TEXT)
|
|
|
|
@allure.title("测试连接池容量限制 - TDD Red阶段")
|
|
@allure.description("验证连接池的容量限制和等待机制 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.CRITICAL)
|
|
@pytest.mark.smoke
|
|
def test_pool_capacity_limit(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接池容量限制
|
|
|
|
预期结果:
|
|
- 连接数不超过最大值
|
|
- 超出时等待或报错
|
|
- 释放后可以继续获取
|
|
"""
|
|
from core.connection_pool import ConnectionPool
|
|
|
|
with allure.step("Step 1: 创建容量为3的连接池"):
|
|
pool = ConnectionPool(
|
|
min_connections=1,
|
|
max_connections=3,
|
|
host="localhost",
|
|
port=3306,
|
|
database="test_db",
|
|
user="test_user",
|
|
password="test_pass"
|
|
)
|
|
allure.attach("✅ 创建容量为3的连接池", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 获取3个连接"):
|
|
conn1 = pool.get_connection()
|
|
conn2 = pool.get_connection()
|
|
conn3 = pool.get_connection()
|
|
allure.attach("✅ 获取3个连接", "步骤2", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 3: 验证第4个连接需要等待"):
|
|
start_time = time.time()
|
|
try:
|
|
conn4 = pool.get_connection(timeout=1)
|
|
elapsed = time.time() - start_time
|
|
allure.attach(f"⚠️ 第4个连接获取成功,耗时: {elapsed:.2f}s", "步骤3", allure.attachment_type.TEXT)
|
|
except Exception as e:
|
|
elapsed = time.time() - start_time
|
|
allure.attach(f"✅ 第4个连接获取超时,耗时: {elapsed:.2f}s", "步骤3", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 4: 释放连接后继续获取"):
|
|
pool.release_connection(conn1)
|
|
conn4 = pool.get_connection()
|
|
assert conn4 is not None, "释放后应该能获取到连接"
|
|
allure.attach("✅ 释放后成功获取连接", "步骤4", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 5: 清理"):
|
|
pool.release_connection(conn2)
|
|
pool.release_connection(conn3)
|
|
pool.release_connection(conn4)
|
|
pool.close()
|
|
|
|
@allure.title("测试连接池统计信息 - TDD Red阶段")
|
|
@allure.description("验证连接池统计信息正确收集 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.NORMAL)
|
|
@pytest.mark.regression
|
|
def test_pool_statistics(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接池统计信息
|
|
|
|
预期结果:
|
|
- 总连接数统计正确
|
|
- 空闲连接数统计正确
|
|
- 活跃连接数统计正确
|
|
- 等待次数统计正确
|
|
"""
|
|
from core.connection_pool import ConnectionPool
|
|
|
|
with allure.step("Step 1: 创建连接池"):
|
|
pool = ConnectionPool(
|
|
min_connections=2,
|
|
max_connections=5,
|
|
host="localhost",
|
|
port=3306,
|
|
database="test_db",
|
|
user="test_user",
|
|
password="test_pass"
|
|
)
|
|
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 获取统计信息"):
|
|
stats = pool.get_stats()
|
|
allure.attach(f"初始统计: {stats}", "步骤2", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 3: 验证统计信息"):
|
|
assert "total_connections" in stats, "统计信息应包含total_connections"
|
|
assert "idle_connections" in stats, "统计信息应包含idle_connections"
|
|
assert "active_connections" in stats, "统计信息应包含active_connections"
|
|
assert stats["total_connections"] >= 2, "总连接数应至少为min_connections"
|
|
|
|
with allure.step("Step 4: 获取连接后验证统计变化"):
|
|
conn = pool.get_connection()
|
|
stats_after_get = pool.get_stats()
|
|
allure.attach(f"获取连接后统计: {stats_after_get}", "步骤4", allure.attachment_type.TEXT)
|
|
pool.release_connection(conn)
|
|
pool.close()
|
|
|
|
@allure.title("测试连接池健康检查 - TDD Red阶段")
|
|
@allure.description("验证连接池健康检查功能 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.CRITICAL)
|
|
@pytest.mark.smoke
|
|
def test_pool_health_check(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接池健康检查
|
|
|
|
预期结果:
|
|
- 可以检查连接健康状态
|
|
- 不健康连接被自动替换
|
|
- 健康检查定期执行
|
|
"""
|
|
from core.connection_pool import ConnectionPool
|
|
|
|
with allure.step("Step 1: 创建带健康检查的连接池"):
|
|
pool = ConnectionPool(
|
|
min_connections=2,
|
|
max_connections=5,
|
|
host="localhost",
|
|
port=3306,
|
|
database="test_db",
|
|
user="test_user",
|
|
password="test_pass",
|
|
health_check_interval=1
|
|
)
|
|
allure.attach("✅ 创建带健康检查的连接池", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 执行健康检查"):
|
|
is_healthy = pool.health_check()
|
|
allure.attach(f"健康状态: {is_healthy}", "步骤2", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 3: 获取健康统计"):
|
|
health_stats = pool.get_health_stats()
|
|
allure.attach(f"健康统计: {health_stats}", "步骤3", allure.attachment_type.TEXT)
|
|
assert "healthy_connections" in health_stats, "健康统计应包含healthy_connections"
|
|
assert "unhealthy_connections" in health_stats, "健康统计应包含unhealthy_connections"
|
|
|
|
with allure.step("Step 4: 清理"):
|
|
pool.close()
|
|
|
|
@allure.title("测试连接池线程安全 - TDD Red阶段")
|
|
@allure.description("验证连接池在多线程环境下的安全性 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.CRITICAL)
|
|
@pytest.mark.smoke
|
|
def test_pool_thread_safety(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接池线程安全
|
|
|
|
预期结果:
|
|
- 多线程并发获取连接不会出错
|
|
- 连接分配正确
|
|
- 不会出现竞态条件
|
|
"""
|
|
from core.connection_pool import ConnectionPool
|
|
|
|
with allure.step("Step 1: 创建连接池"):
|
|
pool = ConnectionPool(
|
|
min_connections=2,
|
|
max_connections=10,
|
|
host="localhost",
|
|
port=3306,
|
|
database="test_db",
|
|
user="test_user",
|
|
password="test_pass"
|
|
)
|
|
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 多线程并发获取连接"):
|
|
errors = []
|
|
connections = []
|
|
lock = threading.Lock()
|
|
|
|
def get_connection_task(thread_id: int):
|
|
try:
|
|
conn = pool.get_connection(timeout=5)
|
|
with lock:
|
|
connections.append(conn)
|
|
time.sleep(0.1) # 模拟使用
|
|
pool.release_connection(conn)
|
|
except Exception as e:
|
|
with lock:
|
|
errors.append(str(e))
|
|
|
|
threads = []
|
|
for i in range(10):
|
|
t = threading.Thread(target=get_connection_task, args=(i,))
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
allure.attach(f"线程数: 10, 错误数: {len(errors)}", "步骤2", allure.attachment_type.TEXT)
|
|
assert len(errors) == 0, f"并发获取连接出现错误: {errors}"
|
|
|
|
with allure.step("Step 3: 清理"):
|
|
pool.close()
|
|
|
|
@allure.title("测试连接池管理器 - TDD Red阶段")
|
|
@allure.description("验证连接池管理器的单例模式和多池管理 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.NORMAL)
|
|
@pytest.mark.regression
|
|
def test_pool_manager(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接池管理器
|
|
|
|
预期结果:
|
|
- 单例模式正确
|
|
- 可以管理多个连接池
|
|
- 可以获取指定连接池
|
|
"""
|
|
from core.connection_pool import ConnectionPoolManager
|
|
|
|
with allure.step("Step 1: 获取连接池管理器实例"):
|
|
manager1 = ConnectionPoolManager()
|
|
manager2 = ConnectionPoolManager()
|
|
assert manager1 is manager2, "应该是单例模式"
|
|
allure.attach("✅ 单例模式验证通过", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 创建多个命名连接池"):
|
|
user_pool = manager1.create_pool(
|
|
"user_db",
|
|
min_connections=2,
|
|
max_connections=5,
|
|
host="localhost",
|
|
port=3306,
|
|
database="user_db",
|
|
user="user",
|
|
password="pass"
|
|
)
|
|
|
|
order_pool = manager1.create_pool(
|
|
"order_db",
|
|
min_connections=2,
|
|
max_connections=5,
|
|
host="localhost",
|
|
port=3306,
|
|
database="order_db",
|
|
user="order",
|
|
password="pass"
|
|
)
|
|
allure.attach("✅ 创建2个命名连接池", "步骤2", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 3: 获取指定连接池"):
|
|
retrieved_user_pool = manager1.get_pool("user_db")
|
|
retrieved_order_pool = manager1.get_pool("order_db")
|
|
assert retrieved_user_pool is user_pool, "应该获取到相同的user_db连接池"
|
|
assert retrieved_order_pool is order_pool, "应该获取到相同的order_db连接池"
|
|
allure.attach("✅ 获取指定连接池成功", "步骤3", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 4: 获取所有连接池统计"):
|
|
all_stats = manager1.get_all_stats()
|
|
allure.attach(f"所有连接池统计: {all_stats}", "步骤4", allure.attachment_type.TEXT)
|
|
assert "user_db" in all_stats, "统计应包含user_db"
|
|
assert "order_db" in all_stats, "统计应包含order_db"
|
|
|
|
with allure.step("Step 5: 清理"):
|
|
manager1.close_all()
|
|
|
|
@allure.title("测试连接池自动扩容 - TDD Red阶段")
|
|
@allure.description("验证连接池根据负载自动扩容 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.NORMAL)
|
|
@pytest.mark.regression
|
|
def test_pool_auto_scaling(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接池自动扩容
|
|
|
|
预期结果:
|
|
- 负载增加时自动扩容
|
|
- 负载降低时自动缩容
|
|
- 扩容不超过最大值
|
|
"""
|
|
from core.connection_pool import ConnectionPool
|
|
|
|
with allure.step("Step 1: 创建可自动扩容的连接池"):
|
|
pool = ConnectionPool(
|
|
min_connections=1,
|
|
max_connections=5,
|
|
host="localhost",
|
|
port=3306,
|
|
database="test_db",
|
|
user="test_user",
|
|
password="test_pass",
|
|
auto_scale=True
|
|
)
|
|
allure.attach("✅ 创建可自动扩容的连接池", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 获取初始统计"):
|
|
initial_stats = pool.get_stats()
|
|
initial_total = initial_stats["total_connections"]
|
|
allure.attach(f"初始连接数: {initial_total}", "步骤2", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 3: 模拟高负载"):
|
|
connections = []
|
|
for i in range(4):
|
|
conn = pool.get_connection()
|
|
connections.append(conn)
|
|
|
|
high_load_stats = pool.get_stats()
|
|
allure.attach(f"高负载连接数: {high_load_stats['total_connections']}", "步骤3", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 4: 释放连接"):
|
|
for conn in connections:
|
|
pool.release_connection(conn)
|
|
|
|
time.sleep(0.5) # 等待缩容
|
|
low_load_stats = pool.get_stats()
|
|
allure.attach(f"低负载连接数: {low_load_stats['total_connections']}", "步骤4", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 5: 清理"):
|
|
pool.close()
|
|
|
|
@allure.title("测试连接池连接验证 - TDD Red阶段")
|
|
@allure.description("验证连接池获取的连接是有效的 - 期望失败(Red)")
|
|
@allure.severity(allure.severity_level.CRITICAL)
|
|
@pytest.mark.smoke
|
|
def test_connection_validation(self) -> None:
|
|
"""
|
|
TDD Red阶段: 测试连接验证
|
|
|
|
预期结果:
|
|
- 获取的连接是有效的
|
|
- 无效连接被自动替换
|
|
- 可以执行简单查询验证
|
|
"""
|
|
from core.connection_pool import ConnectionPool
|
|
|
|
with allure.step("Step 1: 创建连接池"):
|
|
pool = ConnectionPool(
|
|
min_connections=2,
|
|
max_connections=5,
|
|
host="localhost",
|
|
port=3306,
|
|
database="test_db",
|
|
user="test_user",
|
|
password="test_pass"
|
|
)
|
|
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 2: 获取连接并验证"):
|
|
conn = pool.get_connection()
|
|
is_valid = conn.is_valid()
|
|
allure.attach(f"连接有效: {is_valid}", "步骤2", allure.attachment_type.TEXT)
|
|
assert is_valid, "获取的连接应该是有效的"
|
|
|
|
with allure.step("Step 3: 执行简单查询"):
|
|
try:
|
|
result = conn.execute("SELECT 1")
|
|
allure.attach(f"查询结果: {result}", "步骤3", allure.attachment_type.TEXT)
|
|
except Exception as e:
|
|
allure.attach(f"查询异常: {str(e)}", "步骤3", allure.attachment_type.TEXT)
|
|
|
|
with allure.step("Step 4: 清理"):
|
|
pool.release_connection(conn)
|
|
pool.close()
|