当前位置: 首页 > ds >正文

LlamaIndex 工作流 并发执行

除了循环和分支之外,工作流还可以并发地执行步骤。当你有多个可以相互独立运行的步骤,并且这些步骤中包含需要等待的耗时操作时,这种并发执行的方式就非常有用,因为它允许其他步骤并行运行。

触发多个事件

到目前为止,在我们的示例中,每个步骤只触发了一个事件。但在很多情况下,你可能希望并行执行多个步骤。要做到这一点,你需要触发多个事件。你可以通过 send_event 来实现这一点:

import asyncio
import randomfrom llama_index.core.workflow import Workflow, step, Context, StartEvent, Event, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepTowEvent(Event):payload: strquery: strclass ParallelFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTowEvent:print(ev.payload)ctx.send_event(StepTowEvent(payload="跳转到第二步", query="查询 1"))ctx.send_event(StepTowEvent(payload="跳转到第二步", query="查询 2"))ctx.send_event(StepTowEvent(payload="跳转到第二步", query="查询 3"))return None@step(num_workers=4)async def step_tow(self, ctx: Context, ev: StepTowEvent) -> StopEvent:print(ev.query)await asyncio.sleep(random.randint(1, 5))return StopEvent(result="结束!")async def run_workflow():w = ParallelFlow(timeout=60, verbose=True)result = await w.run(payload="开始工作流...")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(ParallelFlow, filename="parallel_workflow.html")

运行结果:

Running step step_one
开始工作流...
Step step_one produced no event
Running step step_tow
查询 1
Running step step_tow
查询 2
Running step step_tow
查询 3
Step step_tow produced event StopEvent
结束!
parallel_workflow.html

流程图:

在这个示例中,我们的起始步骤触发了三个 StepTwoEvents。step_two 步骤使用了 num_workers=4 进行装饰,这告诉工作流最多可以同时运行 4 个该步骤的实例(这也是默认值)。

收集事件

如果你执行前面的示例,你会发现工作流会在第一个完成的查询后停止。有时候这种行为是有用的,但在其他情况下,你可能希望在继续执行下一步之前,等待所有耗时操作都完成。你可以通过 collect_events 来实现这一点:

import asyncio
import randomfrom llama_index.core.workflow import Workflow, Event, step, StartEvent, Context, StopEventfrom llama_index.utils.workflow import draw_all_possible_flowsclass StepTwoEvent(Event):payload: strquery: strclass StepThreeEvent(Event):payload: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTwoEvent:print(ev.payload)ctx.send_event(StepTwoEvent(payload="", query="查询 1"))ctx.send_event(StepTwoEvent(payload="", query="查询 2"))ctx.send_event(StepTwoEvent(payload="", query="查询 3"))return None@step(num_workers=4)async def step_two(self, ctx: Context, ev: StepTwoEvent)-> StepThreeEvent:print("第二步...", ev.query)await asyncio.sleep(random.randint(1, 5))return StepThreeEvent(payload="来自第二步")@stepasync def step_three(self, ctx: Context, ev: StepThreeEvent)-> StopEvent:print("运行第三步...")# 等待直到我们接收到 3 个事件result = ctx.collect_events(ev, [StepThreeEvent] * 3)if result is None:return None# 将这三个结果一并进行处理print("将这三个结果一并进行处理", result)return StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="开始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")

运行结果:

Running step step_one
开始工作流..
Step step_one produced no event
Running step step_two
第二步... 查询 1
Running step step_two
第二步... 查询 2
Running step step_two
第二步... 查询 3
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Running step step_three
运行第三步...
Step step_three produced no event
Running step step_three
运行第三步...
Step step_three produced no event
Running step step_three
运行第三步...
将这三个结果一并进行处理 [StepThreeEvent(payload='来自第二步'), StepThreeEvent(payload='来自第二步'), StepThreeEvent(payload='来自第二步')]
Step step_three produced event StopEvent
Done
concurrent_workflow.html

流程图:

collect_events 方法位于 Context 上,它接收触发当前步骤的事件以及一个需要等待的事件类型数组作为参数。在本例中,我们正在等待 3 个相同类型的 StepThreeEvent 事件。

每当接收到一个 StepThreeEvent 事件时,step_three 步骤就会被触发,但 collect_events 在接收到全部 3 个事件之前会一直返回 None。一旦收集到所有 3 个事件,该步骤将继续执行,并可以将这三个结果一并进行处理。

collect_events 返回的结果是一个按接收顺序排列的事件数组,其中包含所收集到的所有事件。

多种事件类型

当然,你并不要求必须等待相同类型的事件。你可以根据需要等待任意组合的事件,例如在以下示例中:

import asynciofrom llama_index.core.workflow import Event, Workflow, step, Context, StartEvent, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepAEvent(Event):query: str
class StepBEvent(Event):query: str
class StepCEvent(Event):query: strclass StepACompleteEvent(Event):query: str
class StepBCompleteEvent(Event):query: str
class StepCCompleteEvent(Event):query: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepAEvent | StepBEvent | StepCEvent:print(ev.payload)ctx.send_event(StepAEvent(query="A 查询..."))ctx.send_event(StepBEvent(query="B 查询..."))ctx.send_event(StepCEvent(query="C 查询..."))return None@stepasync def step_a(self, ctx: Context, ev: StepAEvent)-> StepACompleteEvent:print(ev.query)return StepACompleteEvent(query="A 查询...完成")@stepasync def step_b(self, ctx: Context, ev: StepBEvent)-> StepBCompleteEvent:print(ev.query)return StepBCompleteEvent(query="B 查询...完成")@stepasync def step_c(self, ctx: Context, ev: StepCEvent)-> StepCCompleteEvent:print(ev.query)return StepCCompleteEvent(query="C 查询...完成")@stepasync def step_tow(self,ctx: Context,ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,) -> StopEvent:print("接收到的事件 ", ev.query)# 等待直到我们接收到 3 个事件result = ctx.collect_events(ev, [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],)if result is None:return None# do something with all 3 results togetherreturn StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="开始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")

运行结果:

Running step step_one
开始工作流..
Step step_one produced no event
Running step step_a
A 查询...
Step step_a produced event StepACompleteEvent
Running step step_b
B 查询...
Step step_b produced event StepBCompleteEvent
Running step step_c
C 查询...
Step step_c produced event StepCCompleteEvent
Running step step_tow
接收到的事件  A 查询...完成
Step step_tow produced no event
Running step step_tow
接收到的事件  B 查询...完成
Step step_tow produced no event
Running step step_tow
接收到的事件  C 查询...完成
Step step_tow produced event StopEvent
Done
concurrent_workflow.html

流程图:

http://www.xdnf.cn/news/14266.html

相关文章:

  • Day13_C语言基础项目实战
  • Java性能问题排查
  • 暴雨亮相EAC2025分享热管理液冷技术
  • Android 中 linux 命令查询设备信息
  • PyTorch框架-自动微分模块
  • 小知识点三、无刷电机闭环控制
  • 大模型_Ubuntu24.04安装RagFlow_使用hyper-v虚拟机_超级详细--人工智能工作笔记0251
  • 【C++】C++17之std::optional
  • 41.第二阶段x64游戏实战-封包-分析周围对象ID
  • qt信号与槽--01
  • 【论文解读】Agentic AI 遇见工业自动化:从“指令”到“意图”的嬗变
  • Tabulate - C++表格格式化库介绍与使用
  • MongoDB详细安装步骤(Windows 系统)
  • SHELL 编程正则表达式
  • js 查看字符串字节数
  • 快速幂算法详解:从暴力到优雅的数学优化
  • Python脚本开发入门:从基础到进阶技巧
  • SpringBoot ​@ControllerAdvice 处理异常
  • 鸿蒙app 开发中 如何 看 app 页面的ui结构
  • JS 数组转Object和Map
  • PHP基础-运算符
  • 【62 Pandas+Pyecharts | 智联招聘大数据岗位数据分析可视化】
  • 如何VMware虚拟机扩容磁盘,有很详细图文
  • Blazor Web Assembly - 使用Power Automate Desktop来跟踪一下Blazor页面的内存使用情况
  • 动态规划:求最长回文子串
  • OpenMMlab导出MaskFormer/Mask2Former实例分割模型并用onnxruntime和tensorrt推理
  • DB2连接池监控与挂起连接释放指南
  • Win32OpenSSL工具下载地址
  • Electron截取响应体
  • @Validation 的自定义校验实现, Spring Boot 和 java