08ea5fbe98
添加用户管理视图、API和状态管理文件
283 lines
6.2 KiB
Python
283 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
定时任务调度器模块演示脚本
|
|
|
|
展示任务调度器的核心功能。
|
|
"""
|
|
|
|
import time
|
|
from core.task_scheduler import TaskScheduler, Task
|
|
|
|
|
|
def demo_task_creation():
|
|
"""演示任务创建和调度"""
|
|
print("\n" + "="*60)
|
|
print("演示1: 任务创建和调度")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
executed = [False]
|
|
|
|
def task_func():
|
|
executed[0] = True
|
|
print("✅ 任务执行了!")
|
|
|
|
task = Task(
|
|
name="test_task",
|
|
func=task_func,
|
|
interval=1
|
|
)
|
|
|
|
scheduler.schedule(task)
|
|
print("⏱️ 等待任务执行...")
|
|
time.sleep(1.5)
|
|
|
|
print(f"✅ 任务执行状态: {executed[0]}")
|
|
scheduler.stop()
|
|
|
|
|
|
def demo_periodic_task():
|
|
"""演示周期性任务"""
|
|
print("\n" + "="*60)
|
|
print("演示2: 周期性任务")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
execution_count = [0]
|
|
|
|
def periodic_task():
|
|
execution_count[0] += 1
|
|
print(f"✅ 周期性任务执行 #{execution_count[0]}")
|
|
|
|
task = Task(
|
|
name="periodic_task",
|
|
func=periodic_task,
|
|
interval=0.5,
|
|
repeat=True
|
|
)
|
|
|
|
scheduler.schedule(task)
|
|
print("⏱️ 等待执行多次...")
|
|
time.sleep(2)
|
|
|
|
print(f"✅ 总执行次数: {execution_count[0]}")
|
|
scheduler.stop()
|
|
|
|
|
|
def demo_task_cancellation():
|
|
"""演示任务取消"""
|
|
print("\n" + "="*60)
|
|
print("演示3: 任务取消")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
executed = [False]
|
|
|
|
def task_func():
|
|
executed[0] = True
|
|
|
|
task = Task(
|
|
name="cancellable_task",
|
|
func=task_func,
|
|
interval=2
|
|
)
|
|
|
|
task_id = scheduler.schedule(task)
|
|
scheduler.cancel(task_id)
|
|
print("✅ 任务已取消")
|
|
|
|
time.sleep(2.5)
|
|
print(f"✅ 任务执行状态: {executed[0]} (应该为False)")
|
|
scheduler.stop()
|
|
|
|
|
|
def demo_task_priority():
|
|
"""演示任务优先级"""
|
|
print("\n" + "="*60)
|
|
print("演示4: 任务优先级")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
execution_order = []
|
|
|
|
def high_priority_task():
|
|
execution_order.append("high")
|
|
print("✅ 高优先级任务执行")
|
|
|
|
def low_priority_task():
|
|
execution_order.append("low")
|
|
print("✅ 低优先级任务执行")
|
|
|
|
scheduler.schedule(Task(
|
|
name="low_task",
|
|
func=low_priority_task,
|
|
interval=0.1,
|
|
priority=1
|
|
))
|
|
scheduler.schedule(Task(
|
|
name="high_task",
|
|
func=high_priority_task,
|
|
interval=0.1,
|
|
priority=10
|
|
))
|
|
|
|
time.sleep(0.5)
|
|
print(f"✅ 执行顺序: {execution_order}")
|
|
scheduler.stop()
|
|
|
|
|
|
def demo_error_handling():
|
|
"""演示错误处理"""
|
|
print("\n" + "="*60)
|
|
print("演示5: 错误处理")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
error_handled = [False]
|
|
|
|
def error_task():
|
|
raise ValueError("测试异常")
|
|
|
|
def on_error(e):
|
|
error_handled[0] = True
|
|
print(f"✅ 错误被捕获: {e}")
|
|
|
|
task = Task(
|
|
name="error_task",
|
|
func=error_task,
|
|
interval=0.5,
|
|
on_error=on_error
|
|
)
|
|
|
|
scheduler.schedule(task)
|
|
time.sleep(1)
|
|
|
|
print(f"✅ 错误处理状态: {error_handled[0]}")
|
|
scheduler.stop()
|
|
|
|
|
|
def demo_task_statistics():
|
|
"""演示任务统计"""
|
|
print("\n" + "="*60)
|
|
print("演示6: 任务统计")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
|
|
def simple_task():
|
|
pass
|
|
|
|
task = Task(
|
|
name="stats_task",
|
|
func=simple_task,
|
|
interval=0.3,
|
|
repeat=True
|
|
)
|
|
|
|
scheduler.schedule(task)
|
|
time.sleep(1)
|
|
|
|
stats = scheduler.get_stats()
|
|
print(f"✅ 统计信息:")
|
|
print(f" 总执行次数: {stats['total_executions']}")
|
|
print(f" 总错误数: {stats['total_errors']}")
|
|
print(f" 待处理任务: {stats['pending_tasks']}")
|
|
print(f" 运行中任务: {stats['running_tasks']}")
|
|
print(f" 状态: {stats['state']}")
|
|
|
|
scheduler.stop()
|
|
|
|
|
|
def demo_delayed_task():
|
|
"""演示延迟任务"""
|
|
print("\n" + "="*60)
|
|
print("演示7: 延迟任务")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
executed = [False]
|
|
|
|
def delayed_task():
|
|
executed[0] = True
|
|
print("✅ 延迟任务执行了!")
|
|
|
|
task = Task(
|
|
name="delayed_task",
|
|
func=delayed_task,
|
|
delay=1.5
|
|
)
|
|
|
|
scheduler.schedule(task)
|
|
print("⏱️ 等待0.5秒...")
|
|
time.sleep(0.5)
|
|
print(f"✅ 0.5秒后执行状态: {executed[0]} (应该为False)")
|
|
|
|
print("⏱️ 再等待1.5秒...")
|
|
time.sleep(1.5)
|
|
print(f"✅ 延迟后执行状态: {executed[0]} (应该为True)")
|
|
|
|
scheduler.stop()
|
|
|
|
|
|
def demo_scheduler_state():
|
|
"""演示调度器状态管理"""
|
|
print("\n" + "="*60)
|
|
print("演示8: 调度器状态管理")
|
|
print("="*60)
|
|
|
|
scheduler = TaskScheduler()
|
|
execution_count = [0]
|
|
|
|
def counting_task():
|
|
execution_count[0] += 1
|
|
print(f"✅ 执行 #{execution_count[0]}")
|
|
|
|
task = Task(
|
|
name="counting_task",
|
|
func=counting_task,
|
|
interval=0.5,
|
|
repeat=True
|
|
)
|
|
|
|
scheduler.schedule(task)
|
|
time.sleep(1)
|
|
|
|
print(f"✅ 暂停前执行次数: {execution_count[0]}")
|
|
scheduler.pause()
|
|
print("✅ 调度器已暂停")
|
|
|
|
count_before = execution_count[0]
|
|
time.sleep(1)
|
|
print(f"✅ 暂停期间执行次数: {execution_count[0]} (应该不变)")
|
|
|
|
scheduler.resume()
|
|
print("✅ 调度器已恢复")
|
|
time.sleep(0.6)
|
|
print(f"✅ 恢复后执行次数: {execution_count[0]} (应该增加)")
|
|
|
|
scheduler.stop()
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
print("\n" + "="*60)
|
|
print("定时任务调度器模块演示")
|
|
print("="*60)
|
|
|
|
demo_task_creation()
|
|
demo_periodic_task()
|
|
demo_task_cancellation()
|
|
demo_task_priority()
|
|
demo_error_handling()
|
|
demo_task_statistics()
|
|
demo_delayed_task()
|
|
demo_scheduler_state()
|
|
|
|
print("\n" + "="*60)
|
|
print("✅ 所有演示完成!")
|
|
print("="*60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|