08ea5fbe98
添加用户管理视图、API和状态管理文件
289 lines
8.3 KiB
Python
289 lines
8.3 KiB
Python
"""
|
|
定时任务调度器模块
|
|
|
|
提供定时任务的创建、调度、执行和管理功能。
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
import uuid
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
import heapq
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
"""任务状态"""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
CANCELLED = "cancelled"
|
|
ERROR = "error"
|
|
|
|
|
|
class SchedulerState(Enum):
|
|
"""调度器状态"""
|
|
RUNNING = "running"
|
|
PAUSED = "paused"
|
|
STOPPED = "stopped"
|
|
|
|
|
|
@dataclass
|
|
class Task:
|
|
"""任务定义"""
|
|
name: str
|
|
func: Callable
|
|
interval: float = 0 # 执行间隔(秒)
|
|
delay: float = 0 # 延迟执行时间(秒)
|
|
repeat: bool = False # 是否重复执行
|
|
priority: int = 5 # 优先级(1-10,数字越大优先级越高)
|
|
on_error: Optional[Callable[[Exception], None]] = None
|
|
max_retries: int = 0
|
|
|
|
# 内部字段
|
|
id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
status: TaskStatus = TaskStatus.PENDING
|
|
next_run_time: float = 0
|
|
execution_count: int = 0
|
|
error_count: int = 0
|
|
|
|
def __post_init__(self):
|
|
if self.next_run_time == 0:
|
|
self.next_run_time = time.time() + self.delay
|
|
|
|
def __lt__(self, other):
|
|
# 用于优先级队列比较
|
|
if self.next_run_time != other.next_run_time:
|
|
return self.next_run_time < other.next_run_time
|
|
return self.priority > other.priority
|
|
|
|
|
|
@dataclass
|
|
class TaskExecutionRecord:
|
|
"""任务执行记录"""
|
|
task_id: str
|
|
task_name: str
|
|
start_time: float
|
|
end_time: float
|
|
success: bool
|
|
error: Optional[str] = None
|
|
|
|
|
|
class TaskScheduler:
|
|
"""
|
|
任务调度器
|
|
|
|
特性:
|
|
- 支持定时和周期性任务
|
|
- 支持任务优先级
|
|
- 支持任务取消
|
|
- 支持错误处理
|
|
- 支持暂停/恢复
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""初始化调度器"""
|
|
self._tasks: Dict[str, Task] = {}
|
|
self._task_queue: List[Task] = []
|
|
self._lock = threading.RLock()
|
|
self._condition = threading.Condition(self._lock)
|
|
self._state = SchedulerState.STOPPED
|
|
self._worker_thread: Optional[threading.Thread] = None
|
|
self._execution_records: List[TaskExecutionRecord] = []
|
|
self._total_executions = 0
|
|
self._total_errors = 0
|
|
|
|
def start(self) -> None:
|
|
"""启动调度器"""
|
|
with self._lock:
|
|
if self._state == SchedulerState.RUNNING:
|
|
return
|
|
|
|
self._state = SchedulerState.RUNNING
|
|
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
|
self._worker_thread.start()
|
|
|
|
def stop(self) -> None:
|
|
"""停止调度器"""
|
|
with self._lock:
|
|
self._state = SchedulerState.STOPPED
|
|
self._condition.notify_all()
|
|
|
|
if self._worker_thread and self._worker_thread.is_alive():
|
|
self._worker_thread.join(timeout=5)
|
|
|
|
def pause(self) -> None:
|
|
"""暂停调度器"""
|
|
with self._lock:
|
|
self._state = SchedulerState.PAUSED
|
|
|
|
def resume(self) -> None:
|
|
"""恢复调度器"""
|
|
with self._lock:
|
|
if self._state == SchedulerState.PAUSED:
|
|
self._state = SchedulerState.RUNNING
|
|
self._condition.notify_all()
|
|
|
|
def schedule(self, task: Task) -> str:
|
|
"""
|
|
调度任务
|
|
|
|
Args:
|
|
task: 要调度的任务
|
|
|
|
Returns:
|
|
任务ID
|
|
"""
|
|
with self._lock:
|
|
self._tasks[task.id] = task
|
|
heapq.heappush(self._task_queue, task)
|
|
self._condition.notify()
|
|
|
|
# 自动启动调度器
|
|
if self._state == SchedulerState.STOPPED:
|
|
self.start()
|
|
|
|
return task.id
|
|
|
|
def cancel(self, task_id: str) -> bool:
|
|
"""
|
|
取消任务
|
|
|
|
Args:
|
|
task_id: 任务ID
|
|
|
|
Returns:
|
|
是否成功取消
|
|
"""
|
|
with self._lock:
|
|
if task_id in self._tasks:
|
|
task = self._tasks[task_id]
|
|
task.status = TaskStatus.CANCELLED
|
|
return True
|
|
return False
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
获取统计信息
|
|
|
|
Returns:
|
|
统计信息字典
|
|
"""
|
|
with self._lock:
|
|
return {
|
|
"total_executions": self._total_executions,
|
|
"total_errors": self._total_errors,
|
|
"pending_tasks": len([t for t in self._tasks.values() if t.status == TaskStatus.PENDING]),
|
|
"running_tasks": len([t for t in self._tasks.values() if t.status == TaskStatus.RUNNING]),
|
|
"state": self._state.value,
|
|
}
|
|
|
|
def _worker_loop(self) -> None:
|
|
"""工作线程循环"""
|
|
while True:
|
|
with self._lock:
|
|
# 检查状态
|
|
if self._state == SchedulerState.STOPPED:
|
|
break
|
|
|
|
# 如果暂停,等待恢复
|
|
while self._state == SchedulerState.PAUSED:
|
|
self._condition.wait()
|
|
if self._state == SchedulerState.STOPPED:
|
|
return
|
|
|
|
# 获取下一个要执行的任务
|
|
task = self._get_next_task()
|
|
|
|
if task is None:
|
|
# 没有任务,等待一段时间
|
|
self._condition.wait(timeout=0.1)
|
|
continue
|
|
|
|
# 检查任务是否被取消
|
|
if task.status == TaskStatus.CANCELLED:
|
|
continue
|
|
|
|
# 执行任务
|
|
task.status = TaskStatus.RUNNING
|
|
|
|
# 在锁外执行任务
|
|
self._execute_task(task)
|
|
|
|
def _get_next_task(self) -> Optional[Task]:
|
|
"""获取下一个要执行的任务"""
|
|
now = time.time()
|
|
|
|
while self._task_queue:
|
|
task = heapq.heappop(self._task_queue)
|
|
|
|
# 检查任务是否有效
|
|
if task.status == TaskStatus.CANCELLED:
|
|
continue
|
|
|
|
# 检查是否到执行时间
|
|
if task.next_run_time <= now:
|
|
return task
|
|
else:
|
|
# 还没到时间,放回队列
|
|
heapq.heappush(self._task_queue, task)
|
|
break
|
|
|
|
return None
|
|
|
|
def _execute_task(self, task: Task) -> None:
|
|
"""执行任务"""
|
|
# 再次检查任务是否被取消
|
|
if task.status == TaskStatus.CANCELLED:
|
|
return
|
|
|
|
start_time = time.time()
|
|
success = False
|
|
error_msg = None
|
|
|
|
try:
|
|
task.func()
|
|
success = True
|
|
task.execution_count += 1
|
|
except Exception as e:
|
|
success = False
|
|
error_msg = str(e)
|
|
task.error_count += 1
|
|
|
|
# 调用错误处理回调
|
|
if task.on_error:
|
|
try:
|
|
task.on_error(e)
|
|
except:
|
|
pass
|
|
|
|
with self._lock:
|
|
self._total_errors += 1
|
|
|
|
end_time = time.time()
|
|
|
|
# 记录执行
|
|
with self._lock:
|
|
self._total_executions += 1
|
|
self._execution_records.append(TaskExecutionRecord(
|
|
task_id=task.id,
|
|
task_name=task.name,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
success=success,
|
|
error=error_msg
|
|
))
|
|
|
|
# 处理周期性任务
|
|
with self._lock:
|
|
if task.repeat and task.status != TaskStatus.CANCELLED:
|
|
if task.error_count <= task.max_retries or task.max_retries == 0:
|
|
task.status = TaskStatus.PENDING
|
|
task.next_run_time = time.time() + task.interval
|
|
heapq.heappush(self._task_queue, task)
|
|
else:
|
|
task.status = TaskStatus.ERROR
|
|
else:
|
|
task.status = TaskStatus.COMPLETED if success else TaskStatus.ERROR
|