LlamaIndex 工作流 分支和循环
工作流的一个关键特性是,它们能够以比基于图的方法更简单和灵活的方式实现分支和循环逻辑。
工作流中的循环
要创建一个循环,我们将沿用上一教程中的示例 MyWorkflow,并添加一个新的自定义事件类型。我们将其命名为 LoopEvent,当然你也可以根据需要随意命名。
from asyncio import Eventclass LoopEvent(Event):loop_output: str
现在,我们将导入 random 模块,并修改我们的 step_one 函数,使其随机决定是循环还是继续执行:
import asyncio
import randomfrom llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,
)
from llama_index.utils.workflow import draw_all_possible_flowsclass LoopEvent(Event):loop_output: strclass FirstEvent(Event):first_output: strclass SecondEvent(Event):second_output: strclass MyWorkflow(Workflow):@stepasync def step_one(self, ev: StartEvent | LoopEvent)-> FirstEvent | LoopEvent:"""第一步"""if random.randint(0, 1) == 0:print("发生了不好的事情")return LoopEvent(loop_output="返回到第一步...")else:print("开始第一步...")return FirstEvent(first_output="完成第一步...")@stepasync def step_two(self, ev: FirstEvent)-> SecondEvent:"""第二步"""print(ev.first_output)return SecondEvent(second_output="完成第二步...")@stepasync def step_three(self, ev: SecondEvent)-> StopEvent:"""第三步"""print(ev.second_output)return StopEvent(result="工作流完成!")# 运行工作流
async def run_workflow():w = MyWorkflow(timeout=10, verbose=False)result = await w.run(first_input="开始工作流...")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(MyWorkflow, filename="multi_step_workflow.html")
让我们来可视化这个过程:
你可以通过定义适当的事件类型和返回类型,从任何步骤到任何其他步骤创建一个循环。
循环工作流示例:
import asyncio
import randomfrom llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,
)
from llama_index.utils.workflow import draw_all_possible_flowsclass LoopEvent(Event):payload: strclass FirstEvent(Event):payload: strclass SecondEvent(Event):payload: strclass MyWorkflow(Workflow):@stepasync def start(self, ev: StartEvent)-> FirstEvent:"""开始工作流"""return FirstEvent(payload=f"{ev.first_input}--->执行下一个节点:step_one")@stepasync def step_one(self, ev: FirstEvent | LoopEvent)-> SecondEvent | LoopEvent:"""第一步"""if random.randint(0, 1) == 0:return LoopEvent(payload=f"{ev.payload}--->循环")else:return SecondEvent(payload=f"{ev.payload}--->执行下一个节点:step_two")@stepasync def step_two(self, ev: SecondEvent)-> StopEvent:"""第二步"""return StopEvent(result=f"{ev.payload}--->工作流完成!")# 运行工作流
async def run_workflow():w = MyWorkflow(timeout=10, verbose=False)result = await w.run(first_input="开始工作流")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(MyWorkflow, filename="multi_step_workflow.html")
运行结果:
开始工作流--->执行下一个节点:step_one--->循环--->循环--->循环--->循环--->执行下一个节点:step_two--->工作流完成!
流程图:
工作流中的分支
与循环密切相关的是分支。正如你已经看到的,你可以根据条件返回不同的事件。下面我们来看一个会分支成两条不同路径的工作流示例:
import asyncio
import randomfrom llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,
)
from llama_index.utils.workflow import draw_all_possible_flowsclass BranchA1Event(Event):"""分支 A1"""payload: strclass BranchA2Event(Event):"""分支A2"""payload: strclass BranchB1Event(Event):"""分支 B1"""payload: strclass BranchB2Event(Event):"""分支 B2"""payload: strclass BranchWorkflow(Workflow):"""分支工作流"""@stepasync def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:"""开始...工作流"""if random.randint(0, 1) == 0:#print(f"{ev.first_input}--->分支A")return BranchA1Event(payload=f"{ev.first_input}--->分支A")else:#print(f"{ev.first_input}--->分支B")return BranchB1Event(payload=f"{ev.first_input}--->分支B")@stepasync def step_a1(self, ev: BranchA1Event) -> BranchA2Event:#print(ev.payload)return BranchA2Event(payload=f"{ev.payload}--->分支A1")@stepasync def step_b1(self, ev: BranchB1Event) -> BranchB2Event:#print(ev.payload)return BranchB2Event(payload=f"{ev.payload}--->分支B1")@stepasync def step_a2(self, ev: BranchA2Event) -> StopEvent:#print(ev.payload)return StopEvent(result=f"{ev.payload}--->分支A完成")@stepasync def step_b2(self, ev: BranchB2Event) -> StopEvent:#print(ev.payload)return StopEvent(result=f"{ev.payload}--->分支B完成")# 运行工作流
async def run_workflow():w = BranchWorkflow(timeout=10, verbose=False)result = await w.run(first_input="开始工作流")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(BranchWorkflow, filename="multi_step_workflow.html")
运行结果:
开始工作流--->分支B--->分支B1--->分支B完成
我们导入的内容与之前相同,但我们创建了4种新的事件类型。start 函数会随机决定选择哪一条分支,然后在每条分支中执行多个步骤以完成整个工作流。让我们来可视化这个过程:
当然,你可以以任意顺序组合分支和循环,以满足应用程序的需求。在本教程的后续部分,你将学习如何使用 send_event 并行运行多个分支,并使用 collect_events 对它们进行同步。
接下来,我们将学习如何使用上下文(Context)来维护状态。