AI Agent开发学习系列 - LangGraph(7): 带有条件判断的Conditional Graph
在这个LangGraph条件图示例中,通过使用add_conditional_edges方法实现动态路由机制:定义一个决策函数decide_next_node来根据输入状态(如操作符operation)动态选择下一个执行节点,然后通过条件边映射表将不同的决策结果连接到对应的处理节点(加法节点或减法节点),最后使用START和END常量来定义图的入口和出口点,从而实现了一个能够根据输入条件智能选择执行路径的灵活工作流。
from typing import TypedDict
from langgraph.graph import StateGraph, START, ENDclass AgentState(TypedDict):number1: intoperation: strnumber2: intfinalNumber: intdef adder(state: AgentState) -> AgentState:"""This node adds the 2 numbers"""state["finalNumber"] = state["number1"] + state["number2"]return statedef subtractor(state: AgentState) -> AgentState:"""This node subtracts the 2 numbers"""state["finalNumber"] = state["number1"] - state["number2"]return statedef decide_next_node(state: AgentState) -> AgentState:"""THis node will select the next node of the graph"""if state["operation"] == "+":return "addtion_operation"elif state["operation"] == "-":return "subtraction_operation"graph = StateGraph(AgentState)graph.add_node("add_node", adder)
graph.add_node("subtract_node", subtractor)
graph.add_node("router", lambda state:state) # passthrough funcitongraph.add_edge(START, "router")graph.add_conditional_edges("router",decide_next_node,{# Edge: Node"addtion_operation": "add_node","subtraction_operation": "subtract_node",}
)graph.add_edge("add_node", END)
graph.add_edge("subtract_node", END)app = graph.compile()from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))result = app.invoke({"number1": 1, "operation": "+", "number2": 2})
print(result["finalNumber"])
运行结果:
3
add_conditional_edges 是 LangGraph 中用于创建条件分支的核心方法,它允许根据状态动态选择下一个执行节点。
与 add_edge 的区别
- add_edge: 固定顺序连接
- add_conditional_edges: 根据状态动态选择路径
这种方法使得工作流能够根据输入数据智能地选择不同的处理路径,非常适合需要条件逻辑的复杂应用场景。