第四章:任务工作流编排
Chapter 4: 任务工作流编排
从记忆库到交响乐团:如何让代理高效执行复杂任务?
在上一章文件存储后端,我们学会了如何让代理安全地保存文件。但就像一场交响乐需要指挥协调乐手一样,当代理需要同时处理“生成报告”和“发送邮件”等多个任务时,就需要一套“指挥系统”来管理任务顺序和资源分配。本章将教你如何用任务工作流编排让代理像专业乐团一样高效协作!
核心使命:让复杂任务像乐高一样组装
想象你的助手需要完成以下任务:
用户指令:“分析季度销售数据,生成报告,并将报告发送给团队成员。”
这需要:
- 分解任务:拆分出“数据清洗”、“图表生成”、“邮件发送”等步骤
- 协调执行:确保邮件发送在报告生成后进行
- 容错处理:若数据清洗失败,暂停后续步骤并提示错误
任务工作流编排就是这个“指挥系统”,它帮你:
✅ 自定义任务执行顺序
✅ 管理任务并行或串行执行
✅ 自动处理错误和重试
✅ 跟踪任务进度
关键概念拆解:工作流的四大支柱
1. 任务分解(Task Decomposition)
作用:把大任务拆解成可执行的“积木块”
示例:tasks = ["清洗销售数据", # 任务1"生成销售图表", # 任务2(依赖任务1)"发送邮件通知" # 任务3(依赖任务2) ]
2. 执行策略(Execution Strategy)
作用:决定任务如何“排队”或“并行”
示例:# 串行执行:任务2必须等任务1完成后才能开始 strategy = SerialExecution()# 并行执行:任务1和任务2同时运行(若资源允许) strategy = ParallelExecution(max_parallel=2)
3. 错误处理(Error Handling)
作用:像保险丝一样保护系统
示例:error_strategy = RetryStrategy(max_retries=3, # 最多重试3次fallback="发送错误通知邮件" )
4. 资源分配(Resource Management)
作用:避免“资源争夺战”
示例:resources = {"CPU": 2, # 分配2个CPU核心"Memory": "4GB" # 分配4GB内存 }
实战演练:生成报告并发送
目标:让代理自动完成:
- 从数据库提取销售数据
- 生成可视化图表
- 通过邮件发送最终报告
步骤1:定义任务流程
from forge.workflow import Workflow, Task, SerialExecutionworkflow = Workflow(name="季度报告生成",execution_strategy=SerialExecution(), # 串行执行error_strategy=RetryStrategy(max_retries=2) # 失败重试2次
)# 添加任务
workflow.add_task(Task("清洗数据", depends_on=None))
workflow.add_task(Task("生成图表", depends_on="清洗数据"))
workflow.add_task(Task("发送邮件", depends_on="生成图表"))
步骤2:启动工作流
# 执行工作流
result = workflow.execute()# 查看执行结果
print(result.status) # 成功/失败/进行中
print(result.progress) # 已完成3/3步骤
输出示例:
✅ 工作流完成!
3/3 任务执行成功:
1. 清洗数据 → 用时5秒
2. 生成图表 → 用时8秒
3. 发送邮件 → 用时2秒
内部运作:任务指挥家如何指挥?
以发送报告为例,流程如下:
关键代码片段(来自workflow.py
)
class Workflow:def execute(self):for task in self.sorted_tasks: # 按依赖关系排序try:task.run() # 执行任务self.update_progress() # 更新进度except Exception as e:self.handle_error(e) # 触发错误策略if not self.error_strategy.can_retry():return self._fail("任务链中断")return self._success()
扩展应用:并行执行提升效率
场景:数据清洗和邮件模板设计可以同时进行
workflow = Workflow(execution_strategy=ParallelExecution(max_parallel=2)
)workflow.add_task(Task("清洗数据", depends_on=None))
workflow.add_task(Task("设计邮件模板", depends_on=None)) # 可并行
workflow.add_task(Task("生成图表", depends_on="清洗数据"))
workflow.add_task(Task("发送邮件", depends_on=["生成图表", "设计邮件模板"]))
总结与展望
通过本章,你已掌握:
✅ 将复杂任务拆解为可管理的步骤
✅ 设计串行或并行执行策略
✅ 处理任务执行中的错误和资源问题
下一章我们将学习如何测试代理性能——基准测试框架,教你科学衡量代理的效率和可靠性!