FastGPT源码解析 Agent工作流编排后端详解
FastGPT 工作流后端实现分析
核心架构概览
FastGPT 工作流后端采用节点调度器模式,通过统一的调度引擎执行各种类型的节点,实现复杂的 AI 工作流编排。
1. 调度引擎核心 (dispatchWorkFlow
)
主要职责
- 节点执行调度: 按依赖关系执行节点
- 边状态管理: 控制数据流向
- 变量传递: 节点间数据共享
- 流式响应: 实时推送执行状态
- 错误处理: 异常捕获和恢复
核心流程
export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowResponse> {// 1. 初始化执行环境let { runtimeNodes, runtimeEdges, variables, histories } = data;// 2. 设置系统变量variables = {...getSystemVariable(data),...externalProvider.externalWorkflowVariables,...variables};// 3. 递归执行节点const entryNodes = runtimeNodes.filter(item => item.isEntry);await Promise.all(entryNodes.map(node => checkNodeCanRun(node)));// 4. 返回执行结果return {flowResponses: chatResponses,flowUsages: chatNodeUsages,assistantResponses: chatAssistantResponse,newVariables: removeSystemVariable(variables)};
}
2. 节点类型映射 (callbackMap
)
节点分发器
const callbackMap: Record<FlowNodeTypeEnum, Function> = {[FlowNodeTypeEnum.chatNode]: dispatchChatCompletion, // AI对话[FlowNodeTypeEnum.datasetSearchNode]: dispatchDatasetSearch, // 知识库搜索[FlowNodeTypeEnum.httpRequest468]: dispatchHttp468Request, // HTTP请求[FlowNodeTypeEnum.ifElseNode]: dispatchIfElse, // 条件判断[FlowNodeTypeEnum.code]: dispatchRunCode, // 代码执行[FlowNodeTypeEnum.loop]: dispatchLoop, // 循环控制[FlowNodeTypeEnum.tools]: dispatchRunTools, // 工具调用// ... 30+ 种节点类型
};
节点执行模式
- Active 模式: 正常执行节点逻辑
- Skip 模式: 跳过执行,传递默认值
- Wait 模式: 等待前置条件满足
3. 节点执行机制
节点运行状态检查
async function checkNodeCanRun(node: RuntimeNodeItemType): Promise<RuntimeNodeItemType[]> {// 1. 检查节点运行状态const status = checkNodeRunStatus({ node, runtimeEdges });// 2. 根据状态执行不同逻辑if (status === 'run') {return nodeRunWithActive(node);}if (status === 'skip') {return nodeRunWithSkip(node);}// 3. 递归执行下游节点const { nextStepActiveNodes } = nodeOutput(node, result);return Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)));
}
节点参数注入
function getNodeRunParams(node: RuntimeNodeItemType) {const params: Record<string, any> = {};node.inputs.forEach(input => {// 1. 变量替换 {{$xx.xx$}} 和 {{xx}}let value = replaceEditorVariable({text: input.value,nodes: runtimeNodes,variables});// 2. 引用变量解析value = getReferenceVariableValue({value,nodes: runtimeNodes,variables});// 3. 类型格式化params[input.key] = valueTypeFormat(value, input.valueType);});return params;
}
4. 数据流管理
边状态控制
// 边的三种状态
type EdgeStatus = 'waiting' | 'active' | 'skipped';// 更新边状态
targetEdges.forEach(edge => {if (skipHandleId.includes(edge.sourceHandle)) {edge.status = 'skipped'; // 跳过分支} else {edge.status = 'active'; // 激活分支}
});
节点输出传递
function nodeOutput(node: RuntimeNodeItemType, result: Record<string, any>) {// 1. 更新节点输出值node.outputs.forEach(outputItem => {if (result[outputItem.key] !== undefined) {outputItem.value = result[outputItem.key];}});// 2. 获取下游节点const nextStepActiveNodes = runtimeNodes.filter(node => targetEdges.some(item => item.target === node.nodeId && item.status === 'active'));return { nextStepActiveNodes };
}
5. 特殊功能实现
流式响应处理
// SSE 流式响应配置
if (stream && res) {res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');res.setHeader('Access-Control-Allow-Origin', '*');res.setHeader('Cache-Control', 'no-cache, no-transform');
}// 推送执行状态
props.workflowStreamResponse?.({event: SseResponseEventEnum.flowNodeStatus,data: { status: 'running', name: node.name }
});
交互节点处理
// 处理用户交互节点(表单输入、用户选择等)
const interactiveResponse = nodeRunResult.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {nodeInteractiveResponse = {entryNodeIds: [nodeRunResult.node.nodeId],interactiveResponse};// 暂停工作流,等待用户交互return [];
}
循环控制
// 循环节点实现
[FlowNodeTypeEnum.loop]: dispatchLoop,
[FlowNodeTypeEnum.loopStart]: dispatchLoopStart,
[FlowNodeTypeEnum.loopEnd]: dispatchLoopEnd,// 循环逻辑:通过边的连接实现循环回路
6. 错误处理机制
节点级错误处理
const dispatchRes = await (async () => {try {return await callbackMap[node.flowNodeType](dispatchData);} catch (error) {// 获取所有输出边const targetEdges = runtimeEdges.filter(item => item.source === node.nodeId);const skipHandleIds = targetEdges.map(item => item.sourceHandle);// 跳过所有边并返回错误return {[DispatchNodeResponseKeyEnum.nodeResponse]: {error: formatHttpError(error)},[DispatchNodeResponseKeyEnum.skipHandleId]: skipHandleIds};}
})();
工作流级错误恢复
- 深度限制: 防止无限递归 (最大20层)
- 运行次数控制: 避免死循环
- 资源清理: 连接关闭时自动清理
7. 性能优化
并发执行
// 并行执行无依赖的节点
const nextStepActiveNodesResults = (await Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)))
).flat();
内存管理
// 进程让步,避免阻塞
await surrenderProcess();// 移除系统变量,减少内存占用
newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables)
去重优化
// 确保节点只执行一次
nextStepActiveNodes = nextStepActiveNodes.filter((node, index, self) => self.findIndex(t => t.nodeId === node.nodeId) === index
);
8. 调试支持
Debug 模式
if (props.mode === 'debug') {debugNextStepRunNodes = debugNextStepRunNodes.concat([...nextStepActiveNodes,...nextStepSkipNodes]);// Debug 模式下不继续执行,返回调试信息return { nextStepActiveNodes: [], nextStepSkipNodes: [] };
}
执行追踪
return {debugResponse: {finishedNodes: runtimeNodes, // 已完成的节点finishedEdges: runtimeEdges, // 边的状态nextStepRunNodes: debugNextStepRunNodes // 下一步要执行的节点}
};
总结
FastGPT 工作流后端实现了一个高度可扩展的节点调度引擎,具备以下特点:
- 统一调度: 通过
callbackMap
实现节点类型的统一分发 - 状态管理: 精确控制节点和边的执行状态
- 数据流控制: 支持条件分支、循环、并行等复杂逻辑
- 实时响应: SSE 流式推送执行状态和结果
- 错误恢复: 完善的异常处理和资源清理机制
- 性能优化: 并发执行、内存管理、去重优化
- 调试支持: 完整的调试模式和执行追踪
这套架构为 FastGPT 提供了强大的 AI 工作流编排能力,支持复杂的业务逻辑实现。