/** * 工作流执行器 * 负责执行业务流程工作流,支持重试、回滚和错误处理 */ import { BusinessWorkflow, WorkflowStep } from './business-workflows.js'; import { TestLogger } from './test-logger.js'; export interface WorkflowExecutionResult { success: boolean; workflowName: string; completedSteps: string[]; failedStep?: string; error?: Error; executionTime: number; startTime: Date; endTime: Date; } export interface WorkflowExecutionOptions { maxRetries?: number; retryDelay?: number; continueOnError?: boolean; enableRollback?: boolean; timeout?: number; } export class WorkflowExecutor { private testLogger: TestLogger; private defaultOptions: WorkflowExecutionOptions = { maxRetries: 3, retryDelay: 1000, continueOnError: false, enableRollback: true, timeout: 300000 // 5分钟默认超时 }; constructor(testLogger: TestLogger) { this.testLogger = testLogger; } /** * 执行工作流 */ async execute( workflow: BusinessWorkflow, options?: WorkflowExecutionOptions ): Promise { const mergedOptions = { ...this.defaultOptions, ...options }; const startTime = new Date(); const completedSteps: string[] = []; const executedSteps: WorkflowStep[] = []; this.testLogger.info(`🚀 开始执行工作流: ${workflow.name}`); this.testLogger.info(`📝 工作流描述: ${workflow.description}`); try { // 执行前置条件检查 if (workflow.preconditions) { this.testLogger.info('🔍 检查前置条件'); const preconditionsMet = await workflow.preconditions(); if (!preconditionsMet) { throw new Error('前置条件未满足'); } this.testLogger.success('前置条件检查通过'); } // 执行工作流步骤 for (const step of workflow.steps) { const stepStartTime = Date.now(); try { this.testLogger.startStep(step.name); // 执行步骤(带重试) await this.executeStepWithRetry(step, mergedOptions); executedSteps.push(step); completedSteps.push(step.name); const stepDuration = Date.now() - stepStartTime; this.testLogger.endStep(step.name, 'passed', stepDuration); } catch (error) { const stepDuration = Date.now() - stepStartTime; this.testLogger.endStep(step.name, 'failed', stepDuration); this.testLogger.error(`步骤执行失败: ${step.name}`, error as Error); // 如果需要回滚 if (mergedOptions.enableRollback) { await this.rollback(executedSteps); } const endTime = new Date(); return { success: false, workflowName: workflow.name, completedSteps, failedStep: step.name, error: error as Error, executionTime: endTime.getTime() - startTime.getTime(), startTime, endTime }; } } // 执行后置条件检查 if (workflow.postconditions) { this.testLogger.info('🔍 检查后置条件'); const postconditionsMet = await workflow.postconditions(); if (!postconditionsMet) { throw new Error('后置条件未满足'); } this.testLogger.success('后置条件检查通过'); } const endTime = new Date(); const executionTime = endTime.getTime() - startTime.getTime(); this.testLogger.success(`✅ 工作流执行成功: ${workflow.name} (${executionTime}ms)`); return { success: true, workflowName: workflow.name, completedSteps, executionTime, startTime, endTime }; } catch (error) { const endTime = new Date(); // 执行清理 if (workflow.cleanup) { this.testLogger.info('🧹 执行清理操作'); try { await workflow.cleanup(); } catch (cleanupError) { this.testLogger.error('清理操作失败', cleanupError as Error); } } return { success: false, workflowName: workflow.name, completedSteps, error: error as Error, executionTime: endTime.getTime() - startTime.getTime(), startTime, endTime }; } } /** * 带重试的步骤执行 */ private async executeStepWithRetry( step: WorkflowStep, options: WorkflowExecutionOptions ): Promise { const maxRetries = step.retryCount ?? options.maxRetries ?? 3; const retryDelay = options.retryDelay ?? 1000; const timeout = step.timeout ?? options.timeout ?? 30000; let lastError: Error | undefined; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { this.testLogger.info(`🔄 执行步骤: ${step.name} (尝试 ${attempt}/${maxRetries})`); // 使用 Promise.race 实现超时控制 await Promise.race([ step.action(), new Promise((_, reject) => setTimeout(() => reject(new Error(`步骤超时: ${step.name}`)), timeout) ) ]); // 执行成功 return; } catch (error) { lastError = error as Error; this.testLogger.warn(`步骤执行失败 (尝试 ${attempt}/${maxRetries}): ${step.name}`); if (attempt < maxRetries) { this.testLogger.info(`⏳ 等待 ${retryDelay}ms 后重试...`); await this.delay(retryDelay); } } } // 所有重试都失败 throw lastError; } /** * 回滚操作 */ private async rollback(executedSteps: WorkflowStep[]): Promise { this.testLogger.info('⏪ 开始回滚操作'); // 逆序执行回滚 for (let i = executedSteps.length - 1; i >= 0; i--) { const step = executedSteps[i]; if (step.rollback) { try { this.testLogger.info(`⏪ 回滚步骤: ${step.name}`); await step.rollback(); this.testLogger.success(`步骤回滚成功: ${step.name}`); } catch (error) { this.testLogger.error(`步骤回滚失败: ${step.name}`, error as Error); } } } this.testLogger.info('⏪ 回滚操作完成'); } /** * 延迟函数 */ private delay(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } /** * 批量执行多个工作流 */ async executeBatch( workflows: BusinessWorkflow[], options?: WorkflowExecutionOptions ): Promise { this.testLogger.info(`📦 开始批量执行 ${workflows.length} 个工作流`); const results: WorkflowExecutionResult[] = []; for (const workflow of workflows) { const result = await this.execute(workflow, options); results.push(result); // 如果失败且不继续执行,则中断 if (!result.success && !options?.continueOnError) { this.testLogger.error(`工作流执行失败,中断批量执行: ${workflow.name}`); break; } } const successCount = results.filter(r => r.success).length; this.testLogger.info(`📦 批量执行完成: ${successCount}/${workflows.length} 成功`); return results; } /** * 并行执行多个工作流 */ async executeParallel( workflows: BusinessWorkflow[], options?: WorkflowExecutionOptions ): Promise { this.testLogger.info(`⚡ 开始并行执行 ${workflows.length} 个工作流`); const promises = workflows.map(workflow => this.execute(workflow, options)); const results = await Promise.all(promises); const successCount = results.filter(r => r.success).length; this.testLogger.info(`⚡ 并行执行完成: ${successCount}/${workflows.length} 成功`); return results; } }