Files
张翔 08ea5fbe98 feat(admin): 添加用户管理相关文件
添加用户管理视图、API和状态管理文件
2026-03-28 14:37:29 +08:00

416 lines
16 KiB
Python

"""
数据库连接池管理模块测试 - TDD Red阶段
测试数据库连接池的创建、管理和监控功能。
"""
import pytest
import allure
import threading
import time
from typing import Any, Optional
@allure.epic("核心框架")
@allure.feature("数据库连接池管理 - TDD Red阶段")
class TestConnectionPool:
"""数据库连接池管理测试类 - TDD Red阶段(期望失败)"""
@allure.title("测试连接池基本操作 - TDD Red阶段")
@allure.description("验证连接池的基本CRUD操作 - 期望失败(Red)")
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.smoke
def test_pool_basic_operations(self) -> None:
"""
TDD Red阶段: 测试连接池基本操作
预期结果:
- 可以创建连接池
- 可以获取连接
- 可以释放连接
- 可以关闭连接池
"""
from core.connection_pool import ConnectionPool
with allure.step("Step 1: 创建连接池"):
pool = ConnectionPool(
min_connections=2,
max_connections=5,
host="localhost",
port=3306,
database="test_db",
user="test_user",
password="test_pass"
)
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 获取连接"):
conn = pool.get_connection()
assert conn is not None, "应该能获取到连接"
allure.attach("✅ 获取连接成功", "步骤2", allure.attachment_type.TEXT)
with allure.step("Step 3: 释放连接"):
pool.release_connection(conn)
allure.attach("✅ 释放连接成功", "步骤3", allure.attachment_type.TEXT)
with allure.step("Step 4: 关闭连接池"):
pool.close()
allure.attach("✅ 连接池关闭成功", "步骤4", allure.attachment_type.TEXT)
@allure.title("测试连接池容量限制 - TDD Red阶段")
@allure.description("验证连接池的容量限制和等待机制 - 期望失败(Red)")
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.smoke
def test_pool_capacity_limit(self) -> None:
"""
TDD Red阶段: 测试连接池容量限制
预期结果:
- 连接数不超过最大值
- 超出时等待或报错
- 释放后可以继续获取
"""
from core.connection_pool import ConnectionPool
with allure.step("Step 1: 创建容量为3的连接池"):
pool = ConnectionPool(
min_connections=1,
max_connections=3,
host="localhost",
port=3306,
database="test_db",
user="test_user",
password="test_pass"
)
allure.attach("✅ 创建容量为3的连接池", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 获取3个连接"):
conn1 = pool.get_connection()
conn2 = pool.get_connection()
conn3 = pool.get_connection()
allure.attach("✅ 获取3个连接", "步骤2", allure.attachment_type.TEXT)
with allure.step("Step 3: 验证第4个连接需要等待"):
start_time = time.time()
try:
conn4 = pool.get_connection(timeout=1)
elapsed = time.time() - start_time
allure.attach(f"⚠️ 第4个连接获取成功,耗时: {elapsed:.2f}s", "步骤3", allure.attachment_type.TEXT)
except Exception as e:
elapsed = time.time() - start_time
allure.attach(f"✅ 第4个连接获取超时,耗时: {elapsed:.2f}s", "步骤3", allure.attachment_type.TEXT)
with allure.step("Step 4: 释放连接后继续获取"):
pool.release_connection(conn1)
conn4 = pool.get_connection()
assert conn4 is not None, "释放后应该能获取到连接"
allure.attach("✅ 释放后成功获取连接", "步骤4", allure.attachment_type.TEXT)
with allure.step("Step 5: 清理"):
pool.release_connection(conn2)
pool.release_connection(conn3)
pool.release_connection(conn4)
pool.close()
@allure.title("测试连接池统计信息 - TDD Red阶段")
@allure.description("验证连接池统计信息正确收集 - 期望失败(Red)")
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.regression
def test_pool_statistics(self) -> None:
"""
TDD Red阶段: 测试连接池统计信息
预期结果:
- 总连接数统计正确
- 空闲连接数统计正确
- 活跃连接数统计正确
- 等待次数统计正确
"""
from core.connection_pool import ConnectionPool
with allure.step("Step 1: 创建连接池"):
pool = ConnectionPool(
min_connections=2,
max_connections=5,
host="localhost",
port=3306,
database="test_db",
user="test_user",
password="test_pass"
)
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 获取统计信息"):
stats = pool.get_stats()
allure.attach(f"初始统计: {stats}", "步骤2", allure.attachment_type.TEXT)
with allure.step("Step 3: 验证统计信息"):
assert "total_connections" in stats, "统计信息应包含total_connections"
assert "idle_connections" in stats, "统计信息应包含idle_connections"
assert "active_connections" in stats, "统计信息应包含active_connections"
assert stats["total_connections"] >= 2, "总连接数应至少为min_connections"
with allure.step("Step 4: 获取连接后验证统计变化"):
conn = pool.get_connection()
stats_after_get = pool.get_stats()
allure.attach(f"获取连接后统计: {stats_after_get}", "步骤4", allure.attachment_type.TEXT)
pool.release_connection(conn)
pool.close()
@allure.title("测试连接池健康检查 - TDD Red阶段")
@allure.description("验证连接池健康检查功能 - 期望失败(Red)")
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.smoke
def test_pool_health_check(self) -> None:
"""
TDD Red阶段: 测试连接池健康检查
预期结果:
- 可以检查连接健康状态
- 不健康连接被自动替换
- 健康检查定期执行
"""
from core.connection_pool import ConnectionPool
with allure.step("Step 1: 创建带健康检查的连接池"):
pool = ConnectionPool(
min_connections=2,
max_connections=5,
host="localhost",
port=3306,
database="test_db",
user="test_user",
password="test_pass",
health_check_interval=1
)
allure.attach("✅ 创建带健康检查的连接池", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 执行健康检查"):
is_healthy = pool.health_check()
allure.attach(f"健康状态: {is_healthy}", "步骤2", allure.attachment_type.TEXT)
with allure.step("Step 3: 获取健康统计"):
health_stats = pool.get_health_stats()
allure.attach(f"健康统计: {health_stats}", "步骤3", allure.attachment_type.TEXT)
assert "healthy_connections" in health_stats, "健康统计应包含healthy_connections"
assert "unhealthy_connections" in health_stats, "健康统计应包含unhealthy_connections"
with allure.step("Step 4: 清理"):
pool.close()
@allure.title("测试连接池线程安全 - TDD Red阶段")
@allure.description("验证连接池在多线程环境下的安全性 - 期望失败(Red)")
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.smoke
def test_pool_thread_safety(self) -> None:
"""
TDD Red阶段: 测试连接池线程安全
预期结果:
- 多线程并发获取连接不会出错
- 连接分配正确
- 不会出现竞态条件
"""
from core.connection_pool import ConnectionPool
with allure.step("Step 1: 创建连接池"):
pool = ConnectionPool(
min_connections=2,
max_connections=10,
host="localhost",
port=3306,
database="test_db",
user="test_user",
password="test_pass"
)
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 多线程并发获取连接"):
errors = []
connections = []
lock = threading.Lock()
def get_connection_task(thread_id: int):
try:
conn = pool.get_connection(timeout=5)
with lock:
connections.append(conn)
time.sleep(0.1) # 模拟使用
pool.release_connection(conn)
except Exception as e:
with lock:
errors.append(str(e))
threads = []
for i in range(10):
t = threading.Thread(target=get_connection_task, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
allure.attach(f"线程数: 10, 错误数: {len(errors)}", "步骤2", allure.attachment_type.TEXT)
assert len(errors) == 0, f"并发获取连接出现错误: {errors}"
with allure.step("Step 3: 清理"):
pool.close()
@allure.title("测试连接池管理器 - TDD Red阶段")
@allure.description("验证连接池管理器的单例模式和多池管理 - 期望失败(Red)")
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.regression
def test_pool_manager(self) -> None:
"""
TDD Red阶段: 测试连接池管理器
预期结果:
- 单例模式正确
- 可以管理多个连接池
- 可以获取指定连接池
"""
from core.connection_pool import ConnectionPoolManager
with allure.step("Step 1: 获取连接池管理器实例"):
manager1 = ConnectionPoolManager()
manager2 = ConnectionPoolManager()
assert manager1 is manager2, "应该是单例模式"
allure.attach("✅ 单例模式验证通过", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 创建多个命名连接池"):
user_pool = manager1.create_pool(
"user_db",
min_connections=2,
max_connections=5,
host="localhost",
port=3306,
database="user_db",
user="user",
password="pass"
)
order_pool = manager1.create_pool(
"order_db",
min_connections=2,
max_connections=5,
host="localhost",
port=3306,
database="order_db",
user="order",
password="pass"
)
allure.attach("✅ 创建2个命名连接池", "步骤2", allure.attachment_type.TEXT)
with allure.step("Step 3: 获取指定连接池"):
retrieved_user_pool = manager1.get_pool("user_db")
retrieved_order_pool = manager1.get_pool("order_db")
assert retrieved_user_pool is user_pool, "应该获取到相同的user_db连接池"
assert retrieved_order_pool is order_pool, "应该获取到相同的order_db连接池"
allure.attach("✅ 获取指定连接池成功", "步骤3", allure.attachment_type.TEXT)
with allure.step("Step 4: 获取所有连接池统计"):
all_stats = manager1.get_all_stats()
allure.attach(f"所有连接池统计: {all_stats}", "步骤4", allure.attachment_type.TEXT)
assert "user_db" in all_stats, "统计应包含user_db"
assert "order_db" in all_stats, "统计应包含order_db"
with allure.step("Step 5: 清理"):
manager1.close_all()
@allure.title("测试连接池自动扩容 - TDD Red阶段")
@allure.description("验证连接池根据负载自动扩容 - 期望失败(Red)")
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.regression
def test_pool_auto_scaling(self) -> None:
"""
TDD Red阶段: 测试连接池自动扩容
预期结果:
- 负载增加时自动扩容
- 负载降低时自动缩容
- 扩容不超过最大值
"""
from core.connection_pool import ConnectionPool
with allure.step("Step 1: 创建可自动扩容的连接池"):
pool = ConnectionPool(
min_connections=1,
max_connections=5,
host="localhost",
port=3306,
database="test_db",
user="test_user",
password="test_pass",
auto_scale=True
)
allure.attach("✅ 创建可自动扩容的连接池", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 获取初始统计"):
initial_stats = pool.get_stats()
initial_total = initial_stats["total_connections"]
allure.attach(f"初始连接数: {initial_total}", "步骤2", allure.attachment_type.TEXT)
with allure.step("Step 3: 模拟高负载"):
connections = []
for i in range(4):
conn = pool.get_connection()
connections.append(conn)
high_load_stats = pool.get_stats()
allure.attach(f"高负载连接数: {high_load_stats['total_connections']}", "步骤3", allure.attachment_type.TEXT)
with allure.step("Step 4: 释放连接"):
for conn in connections:
pool.release_connection(conn)
time.sleep(0.5) # 等待缩容
low_load_stats = pool.get_stats()
allure.attach(f"低负载连接数: {low_load_stats['total_connections']}", "步骤4", allure.attachment_type.TEXT)
with allure.step("Step 5: 清理"):
pool.close()
@allure.title("测试连接池连接验证 - TDD Red阶段")
@allure.description("验证连接池获取的连接是有效的 - 期望失败(Red)")
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.smoke
def test_connection_validation(self) -> None:
"""
TDD Red阶段: 测试连接验证
预期结果:
- 获取的连接是有效的
- 无效连接被自动替换
- 可以执行简单查询验证
"""
from core.connection_pool import ConnectionPool
with allure.step("Step 1: 创建连接池"):
pool = ConnectionPool(
min_connections=2,
max_connections=5,
host="localhost",
port=3306,
database="test_db",
user="test_user",
password="test_pass"
)
allure.attach("✅ 连接池创建成功", "步骤1", allure.attachment_type.TEXT)
with allure.step("Step 2: 获取连接并验证"):
conn = pool.get_connection()
is_valid = conn.is_valid()
allure.attach(f"连接有效: {is_valid}", "步骤2", allure.attachment_type.TEXT)
assert is_valid, "获取的连接应该是有效的"
with allure.step("Step 3: 执行简单查询"):
try:
result = conn.execute("SELECT 1")
allure.attach(f"查询结果: {result}", "步骤3", allure.attachment_type.TEXT)
except Exception as e:
allure.attach(f"查询异常: {str(e)}", "步骤3", allure.attachment_type.TEXT)
with allure.step("Step 4: 清理"):
pool.release_connection(conn)
pool.close()