""" 本地并发控制模块测试 - TDD Red阶段 测试本地并发控制的各种机制,包括信号量、读写锁、限流器等。 """ import pytest import allure import threading import time from typing import Any, Optional @allure.epic("核心框架") @allure.feature("本地并发控制 - TDD Red阶段") class TestConcurrencyControl: """本地并发控制测试类 - TDD Red阶段(期望失败)""" @allure.title("测试信号量并发控制 - TDD Red阶段") @allure.description("验证信号量限制并发数量 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_semaphore_control(self) -> None: """ TDD Red阶段: 测试信号量并发控制 预期结果: - 信号量限制同时执行的线程数 - 超出限制的线程等待 - 释放后其他线程可以继续 """ from core.concurrency_control import SemaphoreControl with allure.step("Step 1: 创建允许3个并发线程的信号量"): semaphore = SemaphoreControl(max_concurrent=3) allure.attach("✅ 创建信号量(max_concurrent=3)", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 多线程并发执行"): active_count = [0] max_active = [0] lock = threading.Lock() errors = [] def worker(): try: with semaphore.acquire(): with lock: active_count[0] += 1 max_active[0] = max(max_active[0], active_count[0]) time.sleep(0.2) # 模拟工作 with lock: active_count[0] -= 1 except Exception as e: errors.append(str(e)) threads = [threading.Thread(target=worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() allure.attach(f"最大并发数: {max_active[0]}", "步骤2", allure.attachment_type.TEXT) assert len(errors) == 0, f"执行出错: {errors}" assert max_active[0] <= 3, f"并发数超过限制: {max_active[0]}" @allure.title("测试读写锁 - TDD Red阶段") @allure.description("验证读写锁的读共享写独占特性 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_read_write_lock(self) -> None: """ TDD Red阶段: 测试读写锁 预期结果: - 多个读线程可以同时获取读锁 - 写锁独占,其他读写线程等待 - 写锁释放后读线程可以继续 """ from core.concurrency_control import ReadWriteLock with allure.step("Step 1: 创建读写锁"): rw_lock = ReadWriteLock() allure.attach("✅ 创建读写锁", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 测试多线程读共享"): read_count = [0] max_concurrent_reads = [0] lock = threading.Lock() def reader(): with rw_lock.read_lock(): with lock: read_count[0] += 1 max_concurrent_reads[0] = max(max_concurrent_reads[0], read_count[0]) time.sleep(0.1) with lock: read_count[0] -= 1 threads = [threading.Thread(target=reader) for _ in range(5)] for t in threads: t.start() for t in threads: t.join() allure.attach(f"最大并发读数: {max_concurrent_reads[0]}", "步骤2", allure.attachment_type.TEXT) assert max_concurrent_reads[0] > 1, "读锁应该支持并发" with allure.step("Step 3: 测试写锁独占"): write_active = [False] read_active = [False] violations = [] def writer(): with rw_lock.write_lock(): if read_active[0]: violations.append("写锁获取时读锁仍活跃") write_active[0] = True time.sleep(0.1) write_active[0] = False def reader_check(): with rw_lock.read_lock(): if write_active[0]: violations.append("读锁获取时写锁仍活跃") read_active[0] = True time.sleep(0.05) read_active[0] = False t1 = threading.Thread(target=writer) t2 = threading.Thread(target=reader_check) t1.start() time.sleep(0.01) t2.start() t1.join() t2.join() allure.attach(f"违反规则数: {len(violations)}", "步骤3", allure.attachment_type.TEXT) assert len(violations) == 0, f"读写锁规则违反: {violations}" @allure.title("测试限流器 - TDD Red阶段") @allure.description("验证限流器控制请求速率 - 期望失败(Red)") @allure.severity(allure.severity_level.CRITICAL) @pytest.mark.smoke def test_rate_limiter(self) -> None: """ TDD Red阶段: 测试限流器 预期结果: - 限流器限制单位时间内的请求数 - 超出限制的请求被拒绝或等待 - 时间窗口后限制重置 """ from core.concurrency_control import RateLimiter with allure.step("Step 1: 创建限流器(每秒5个请求)"): limiter = RateLimiter(max_requests=5, time_window=1) allure.attach("✅ 创建限流器(max_requests=5, time_window=1)", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 测试正常请求"): allowed = 0 for i in range(5): if limiter.allow_request(): allowed += 1 allure.attach(f"前5个请求通过: {allowed}", "步骤2", allure.attachment_type.TEXT) assert allowed == 5, f"前5个请求应该全部通过,实际: {allowed}" with allure.step("Step 3: 测试限流"): blocked = 0 for i in range(3): if not limiter.allow_request(): blocked += 1 allure.attach(f"超出限制被阻止: {blocked}", "步骤3", allure.attachment_type.TEXT) assert blocked == 3, f"超出限制的请求应该被阻止,实际: {blocked}" with allure.step("Step 4: 等待时间窗口重置"): time.sleep(1.1) reset_allowed = 0 for i in range(3): if limiter.allow_request(): reset_allowed += 1 allure.attach(f"重置后通过: {reset_allowed}", "步骤4", allure.attachment_type.TEXT) assert reset_allowed == 3, f"重置后请求应该通过,实际: {reset_allowed}" @allure.title("测试分布式锁(本地模拟) - TDD Red阶段") @allure.description("验证本地模拟的分布式锁 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_distributed_lock_local(self) -> None: """ TDD Red阶段: 测试本地模拟的分布式锁 预期结果: - 同一时刻只有一个线程获取锁 - 锁超时后自动释放 - 支持可重入 """ from core.concurrency_control import LocalDistributedLock with allure.step("Step 1: 创建分布式锁"): lock = LocalDistributedLock("test_resource") allure.attach("✅ 创建分布式锁", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 测试互斥性"): acquired_count = [0] lock_obj = threading.Lock() def try_acquire(): if lock.acquire(timeout=0.1): with lock_obj: acquired_count[0] += 1 time.sleep(0.2) lock.release() threads = [threading.Thread(target=try_acquire) for _ in range(5)] for t in threads: t.start() for t in threads: t.join() allure.attach(f"成功获取锁次数: {acquired_count[0]}", "步骤2", allure.attachment_type.TEXT) assert acquired_count[0] >= 1, "应该有线程成功获取锁" with allure.step("Step 3: 测试超时释放"): # 获取锁不释放,模拟超时 lock2 = LocalDistributedLock("test_resource2", expire_seconds=1) assert lock2.acquire(), "应该成功获取锁" time.sleep(1.2) # 锁应该已过期,可以重新获取 assert lock2.acquire(), "超时后应该可以重新获取锁" lock2.release() allure.attach("✅ 超时释放测试通过", "步骤3", allure.attachment_type.TEXT) @allure.title("测试并发计数器 - TDD Red阶段") @allure.description("验证线程安全的计数器 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_concurrent_counter(self) -> None: """ TDD Red阶段: 测试并发计数器 预期结果: - 多线程并发增减计数准确 - 原子操作保证数据一致性 """ from core.concurrency_control import ConcurrentCounter with allure.step("Step 1: 创建并发计数器"): counter = ConcurrentCounter(initial_value=0) allure.attach("✅ 创建并发计数器", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 多线程并发递增"): def increment_worker(): for _ in range(100): counter.increment() threads = [threading.Thread(target=increment_worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() final_value = counter.get_value() allure.attach(f"最终计数值: {final_value}", "步骤2", allure.attachment_type.TEXT) assert final_value == 1000, f"计数值错误,期望: 1000, 实际: {final_value}" with allure.step("Step 3: 多线程并发递减"): def decrement_worker(): for _ in range(50): counter.decrement() threads = [threading.Thread(target=decrement_worker) for _ in range(5)] for t in threads: t.start() for t in threads: t.join() final_value = counter.get_value() allure.attach(f"递减后计数值: {final_value}", "步骤3", allure.attachment_type.TEXT) assert final_value == 750, f"计数值错误,期望: 750, 实际: {final_value}" @allure.title("测试屏障同步 - TDD Red阶段") @allure.description("验证屏障同步多个线程 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_barrier_sync(self) -> None: """ TDD Red阶段: 测试屏障同步 预期结果: - 屏障等待指定数量的线程 - 所有线程到达后同时放行 - 可以重复使用 """ from core.concurrency_control import ThreadBarrier with allure.step("Step 1: 创建屏障(等待3个线程)"): barrier = ThreadBarrier(parties=3) allure.attach("✅ 创建屏障(parties=3)", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 测试屏障同步"): arrival_times = [] lock = threading.Lock() def worker(): time.sleep(0.05) # 模拟一些工作 barrier.wait() with lock: arrival_times.append(time.time()) threads = [threading.Thread(target=worker) for _ in range(3)] start_time = time.time() for t in threads: t.start() for t in threads: t.join() max_diff = max(arrival_times) - min(arrival_times) allure.attach(f"最大到达时间差: {max_diff:.4f}s", "步骤2", allure.attachment_type.TEXT) assert max_diff < 0.1, f"屏障同步失败,时间差过大: {max_diff}" @allure.title("测试任务队列 - TDD Red阶段") @allure.description("验证有界任务队列 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_bounded_task_queue(self) -> None: """ TDD Red阶段: 测试有界任务队列 预期结果: - 队列有容量限制 - 满时put阻塞或超时 - 支持优先级 """ from core.concurrency_control import BoundedTaskQueue with allure.step("Step 1: 创建容量为3的任务队列"): queue = BoundedTaskQueue(max_size=3) allure.attach("✅ 创建任务队列(max_size=3)", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 正常添加任务"): for i in range(3): queue.put(f"task_{i}") allure.attach("✅ 添加3个任务", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 测试队列满"): try: queue.put("overflow_task", timeout=0.1) assert False, "应该抛出超时异常" except Exception: allure.attach("✅ 队列满时正确阻塞", "步骤3", allure.attachment_type.TEXT) with allure.step("Step 4: 消费任务"): tasks = [] for _ in range(3): tasks.append(queue.get(timeout=0.1)) allure.attach(f"消费任务: {tasks}", "步骤4", allure.attachment_type.TEXT) assert len(tasks) == 3, f"消费任务数错误: {len(tasks)}" @allure.title("测试并发控制器管理器 - TDD Red阶段") @allure.description("验证并发控制器管理器 - 期望失败(Red)") @allure.severity(allure.severity_level.NORMAL) @pytest.mark.regression def test_concurrency_manager(self) -> None: """ TDD Red阶段: 测试并发控制器管理器 预期结果: - 单例模式 - 可以管理多种并发控制组件 - 支持命名访问 """ from core.concurrency_control import ConcurrencyManager with allure.step("Step 1: 获取管理器实例"): manager1 = ConcurrencyManager() manager2 = ConcurrencyManager() assert manager1 is manager2, "应该是单例模式" allure.attach("✅ 单例模式验证通过", "步骤1", allure.attachment_type.TEXT) with allure.step("Step 2: 创建命名信号量"): semaphore = manager1.create_semaphore("api_limit", max_concurrent=5) allure.attach("✅ 创建命名信号量", "步骤2", allure.attachment_type.TEXT) with allure.step("Step 3: 获取已创建的组件"): retrieved = manager1.get_semaphore("api_limit") assert retrieved is semaphore, "应该获取到相同的信号量" allure.attach("✅ 获取命名组件成功", "步骤3", allure.attachment_type.TEXT) with allure.step("Step 4: 获取统计信息"): stats = manager1.get_all_stats() allure.attach(f"统计信息: {stats}", "步骤4", allure.attachment_type.TEXT) assert "semaphores" in stats, "统计应包含信号量信息"