LangGraph 快速入门
目录
- LangGraph简介
- LangGraph&LangChain
- 快速开始
- LangGraph基础知识
- Build a basic chatbot
- 第1步:创建 StateGraph
- Annotated&reducer
- 第2步:添加一个节点(Node)
- 第3步:添加入口节点(Entry Point)
- 第4步:编译图(Compile the Graph)
- 第5步:可视化图结构(可选)
- 第6步: 运行聊天机器人
- Add tools
- Add memory
- Add human-in-the-loop controls
- human-in-the-loop controls的意义
- Customize state
- Time travel
- 理解"时间旅行"
- 预构建Agent
- 运行 Agent
- Streaming
- Models
- 工具调用支持(Tool calling support)
- 通过名称指定模型(Specifying a model by name)
- 使用 `init_chat_model`(Using init\_chat\_model)
- 使用特定厂商模型(Using provider-specific LLMs)
- 禁用流式输出(Disable streaming)
- 添加模型回退机制(Adding model fallbacks)
- 工具(Tools)
- 定义简单工具(Define simple tools)
- 自定义工具行为(Customize tools)
- 隐藏模型不可见参数(Hide arguments from the model)
- 禁用并行工具调用(Disable parallel tool calling)
- 直接返回工具结果(Return tool results directly)
- 强制使用某个工具(Force tool use)
- 处理工具错误(Handle tool errors)
- 使用记忆(Working with memory)
- 使用预构建工具(Prebuilt tools)
- MCP 集成
- 上下文(Context)
- 上下文(Context)
- 提供运行时上下文
- 使用上下文自定义提示词
- 在工具中访问上下文
- 记忆(Memory)
- 短期记忆
- 管理消息历史
- 在工具中读取状态
- 在工具中写入状态
- 长期记忆
- 读取数据(Read)
- 写入数据(Write)
- 语义搜索(Semantic search)
- 人类介入(Human-in-the-loop)
- 工具调用审查(Review tool calls)
- 与 Agent Inbox 一起使用
- 多智能体
- Supervisor
- Swarm
- 交接(Handoffs)
- Evals(评估)&部署(Deployment) & UI
LangGraph简介
LangGraph 是由 LangChain 团队开发的开源 MIT 许可框架,专为构建具有状态管理和多智能体协作能力的应用而设计。它通过图结构(StateGraph)组织任务流程,使开发者能够精确控制执行逻辑、状态更新和任务调度。(知乎专栏)
🔑 核心概念
- State(状态):应用的核心数据结构,通常使用 TypedDict 或 Pydantic 模型定义,存储所有上下文信息,如用户输入、历史对话、工具输出等。
- Node(节点):表示处理步骤的函数,接收当前状态作为输入,执行操作后返回更新的状态。
- Edge(边):定义节点之间的连接关系和执行顺序,包括普通边和条件边(conditional edge),后者根据状态动态选择下一步执行的节点。(CSDN博客, CSDN博客)
🚀 核心优势
- 持久化执行:支持长时间运行的任务,能够在中断后从上次状态恢复,确保流程的连续性和可靠性。
- 人工干预:允许在执行过程中插入人工审核节点,实现人机协作,适用于敏感操作或复杂决策场景。
- 全面记忆:通过状态管理机制,实现短期和长期记忆,支持多轮对话和上下文保持。
- 可视化调试:与 LangSmith 集成,提供执行路径追踪、状态转换捕获和运行时指标,帮助开发者深入理解智能体行为。
- 生产级部署:支持与 LangGraph Platform 配合使用,提供可扩展的部署平台,满足企业级应用需求。(LangGraph)
🛠️ 应用场景
- 多智能体协作:构建需要多个智能体协同工作的系统,如自动化客服、智能推荐等。
- 复杂工作流管理:实现包含循环、条件分支和并发执行的复杂任务流程,如文档处理、数据分析等。
- 人机协作系统:设计需要人工审核或干预的应用,如内容审核、决策支持等。
- 长期任务管理:处理需要长时间运行并保持状态的任务,如自动化测试、持续集成等。(CSDN博客)
LangGraph 的设计灵感来源于 Pregel 和 Apache Beam,其公共接口借鉴了 NetworkX。它由 LangChain Inc 创建,既可以独立使用,也能与 LangChain 等其他工具无缝集成,为开发者提供构建智能体应用的强大支持。(LangGraph)
安装 LangGraph:
pip install -U langgraph
然后,使用预构建组件创建一个智能体:
# 安装模型调用依赖
# pip install -qU "langchain[anthropic]"from langgraph.prebuilt import create_react_agentdef get_weather(city: str) -> str:"""获取指定城市的天气。"""return f"{city} 总是阳光明媚!"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],prompt="你是一个乐于助人的助手"
)# 运行智能体
agent.invoke({"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]}
)
LangGraph 为任何长期运行、有状态的工作流或智能体提供低级支持基础设施。它不抽象提示词或架构,具有以下主要优势:
- 持久执行:构建能够经受失败并可持续运行的智能体,自动从中断点恢复执行。
- 人工干预:无缝集成人工监督,允许在任何执行阶段检查和修改智能体状态。
- 全面记忆:支持短期工作记忆与跨会话的长期持久记忆,打造真正有状态的智能体。
- LangSmith 调试:通过可视化工具追踪执行路径、捕获状态转换,并提供详细的运行时指标,深入洞察复杂智能体行为。
- 生产级部署:提供可扩展基础设施,轻松部署复杂且长期运行的有状态智能体系统,满足生产需求。
LangGraph 可以独立使用,也能无缝集成任何 LangChain 产品,为开发者提供构建智能体的完整工具套件。为提升 LLM 应用开发,建议配合使用:
- LangSmith —— 智能体评估与可观测性工具。调试性能不佳的 LLM 应用,评估智能体执行轨迹,获得生产环境可视化,持续优化性能。
- LangGraph 平台 —— 专为长期运行、有状态工作流设计的部署平台,支持智能体的发现、复用、配置与共享,结合 LangGraph Studio 实现可视化快速迭代。
- LangChain —— 提供集成和可组合组件,简化 LLM 应用开发。
LangGraph&LangChain
LangChain 在构建基于大型语言模型(LLM)的应用方面提供了许多便利,但在实际开发中,尤其是面对复杂工作流时,开发者逐渐发现其存在一些明显的局限性。这些问题促使了 LangGraph 的诞生,作为对 LangChain 的补充和改进。以下是对 LangChain 不足之处的直观总结,以及 LangGraph 如何应对这些挑战的原因。
LangChain 的主要不足
-
过度抽象,导致开发复杂性增加
LangChain 的设计引入了多层抽象,如链(Chain)、代理(Agent)、工具(Tool)等,虽然提供了灵活性,但也增加了理解和调试的难度。开发者常常需要深入底层,才能实现特定的自定义逻辑。
-
状态管理薄弱,难以处理多轮对话
在多轮对话或需要保持上下文的应用中,LangChain 的状态管理能力有限。开发者需要手动处理状态的保存和传递,增加了开发负担。
-
工具调用不稳定,缺乏可预测性
LangChain 的代理机制在调用外部工具时,缺乏明确的执行顺序和条件控制,导致行为不可预测,尤其在复杂工作流中问题更为突出。
-
性能瓶颈,难以扩展
由于依赖于顺序处理和外部服务,LangChain 在处理大型数据集或高并发请求时,容易出现性能瓶颈,限制了其在生产环境中的应用。
-
文档不完善,学习曲线陡峭
LangChain 的文档存在不完整和不一致的问题,缺乏清晰的指导和示例,增加了新手上手的难度。
针对上述 LangChain 的不足,LangGraph 提供了以下改进:
-
图结构工作流,增强可控性
LangGraph 采用有向图(DAG)结构,明确了各个节点(任务)的执行顺序和依赖关系,使工作流更加清晰和可控。
-
内置状态管理,支持复杂对话
LangGraph 提供了内置的状态管理机制,方便开发者在多轮对话或复杂交互中维护上下文,减少了手动处理的需求。
-
明确的工具调用机制
通过图结构,LangGraph 可以精确控制工具的调用时机和条件,避免了 LangChain 中工具调用的不确定性。
-
更好的性能和可扩展性
LangGraph 的设计优化了任务的并行处理能力,减少了对外部服务的依赖,提高了整体性能,适合处理大规模数据和高并发请求。
-
更清晰的文档和示例
虽然 LangGraph 相对较新,但其文档和示例更加注重实用性,帮助开发者更快地理解和应用框架。
上文提到LangGraph 可以独立使用,也能无缝集成任何 LangChain 产品,那是因为LangGraph的Node是一个Callable继而被LangGraph编排,但既然有LangChain存在,那么很多功能就没必要重复造轮子,比如文档切割,我们既可以用LangChain的Spilter作为LangGraph的Node,也可以自己写一个简单的split函数(可调用)作为Node。
总体而言,如果你的应用场景具有预定义的顺序工作流程,如需要从API获取数据并进行总结的简单任务,或者你更倾向于一个直接、模块化的框架,LangChain将是不错的选择。当你的项目需要动态的、有状态的交互,例如构建一个多智能体协作的任务管理系统,或者需要在复杂的工作流程中保持持久的上下文时,LangGraph会更加适合。
快速开始
让我们使用 LangGraph 提供的预构建、可复用组件,这些组件旨在帮助你快速、可靠地构建智能体系统(agentic systems)!
请确保你已经具备以下条件:
- 一个 Anthropic API 密钥
如果你还没有安装,请先安装 LangGraph 和 LangChain:
pip install -U langgraph "langchain[anthropic]"
📌说明:
LangChain 被安装是为了让智能体可以调用模型。
要创建智能体,可以使用 create_react_agent
:
from langgraph.prebuilt import create_react_agentdef get_weather(city: str) -> str:"""获取指定城市的天气。"""return f"It's always sunny in {city}!"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest", # 模型名称tools=[get_weather], # 提供的工具列表prompt="You are a helpful assistant" # 智能体行为的提示词
)# 运行智能体
agent.invoke({"messages": [{"role": "user", "content": "what is the weather in sf"}]}
)
如果你需要为模型设置特定参数(如温度 temperature),可以使用 init_chat_model
:
from langchain.chat_models import init_chat_model
from langgraph.prebuilt import create_react_agentmodel = init_chat_model("anthropic:claude-3-7-sonnet-latest",temperature=0
)agent = create_react_agent(model=model,tools=[get_weather],
)
提示词用于指示 LLM 应如何响应。你可以添加以下两种类型的提示词:
- 静态(Static):使用字符串形式,作为系统消息处理。
- 动态(Dynamic):在运行时生成,由输入或配置决定。
🧩 示例:
from langgraph.prebuilt import create_react_agentagent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],prompt="Never answer questions about the weather." # 固定不变的提示词
)agent.invoke({"messages": [{"role": "user", "content": "what is the weather in sf"}]}
)
为了让智能体支持多轮对话(multi-turn conversations),你需要提供一个持久化组件(checkpointer),在创建智能体时启用它。同时,在运行时需提供包含 thread_id
的配置,它是对话的唯一标识符。
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySavercheckpointer = InMemorySaver()agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],checkpointer=checkpointer # 启用记忆保存器
)# 启动智能体(带对话 ID)
config = {"configurable": {"thread_id": "1"}}sf_response = agent.invoke({"messages": [{"role": "user", "content": "what is the weather in sf"}]},config
)ny_response = agent.invoke({"messages": [{"role": "user", "content": "what about new york?"}]},config
)
📌启用 checkpointer 后,智能体的状态会在每个步骤自动保存到 checkpointer 数据库(此例为内存)。再次调用时,如 thread_id
相同,之前的历史会被自动包含在上下文中。
配置结构化输出(Structured Output)
如果希望智能体返回符合结构(schema)定义的响应,可以使用 response_format
参数。该 schema 可以用 Pydantic 模型或 TypedDict 定义。结果会以 structured_response
字段返回。
from pydantic import BaseModel
from langgraph.prebuilt import create_react_agentclass WeatherResponse(BaseModel):conditions: stragent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],response_format=WeatherResponse # 输出结构定义
)response = agent.invoke({"messages": [{"role": "user", "content": "what is the weather in sf"}]}
)# 获取结构化结果
response["structured_response"]
📌结构化输出会额外调用一次 LLM,用于格式化响应以符合 schema。
LangGraph基础知识
开发者选择 LangGraph,是因为它具备以下优势:
✅ 可靠性与可控性
通过内容审查机制和人工审批(human-in-the-loop)来引导智能体的行为。
LangGraph 能够持久化上下文,即使在长时间运行的工作流中,也能保持智能体在正确的轨道上。
✅ 低层级设计、可扩展性强
使用**完全描述性的底层原语(primitives)**构建自定义智能体,
摆脱限制性的抽象层,让定制化不再受限。
你可以设计可扩展的多智能体系统,让每个智能体根据你的用例承担不同角色,精准服务于任务目标。
✅ 一流的流式支持
支持逐 token 的流式输出和中间步骤的流式反馈,
让用户实时看到智能体的推理过程与操作路径,提升可观察性和透明度。
为了快速上手 LangGraph 的核心概念和功能,建议完成以下基础教程系列:
- 构建一个基础聊天机器人
- 添加工具支持
- 添加记忆功能
- 加入人工审批流程(human-in-the-loop)
- 自定义状态结构
- 回溯历史并探索其他对话路径(时间旅行)
Build a basic chatbot
第1步:创建 StateGraph
StateGraph
是一个对象,它将我们的聊天机器人结构定义为一个“状态机”。
我们将添加:
- 节点(nodes):表示大语言模型(LLM)和机器人可调用的函数
- 边(edges):定义各个功能之间的状态转移逻辑
from typing import Annotated
from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START
from langgraph.graph.message import add_messagesclass State(TypedDict):# messages 是一个列表类型。# 通过注解 `add_messages` 指定该键的更新方式:# 它会将新消息追加到现有的列表中,而不是覆盖。messages: Annotated[list, add_messages]# 创建状态图构建器
graph_builder = StateGraph(State)
到目前为止,我们的图(Graph)可以处理两个关键任务:
- 每个节点都可以接收当前的
State
作为输入,并返回更新后的State
。 - 借助内建的
add_messages
函数(与Annotated
搭配使用),对messages
的更新将追加到原有列表中,而不是覆盖原内容。
在定义图(Graph)时,第一步是定义其 State
。State
定义了图的状态结构(schema),以及用于处理状态更新的reducer 函数。
在示例中,State
是一个包含单个键 messages
的 TypedDict
。
其中使用了 add_messages
reducer 函数,它会将新消息追加到消息列表中,而不是覆盖原有内容。
如果某个键没有使用 reducer 注解,它在更新时会直接覆盖之前的值。
为什么注释可以改变行为?
Python 原生的 Annotated[...]
本质上只是一个类型提示工具,在运行时默认是不会影响行为的 —— 这是 typing.Annotated
的基本用途。
但在 LangGraph 中,Annotated
被赋予了“行为意义”——LangGraph 显式读取并解释 Annotated
中的元信息,从而根据你传进去的 reducer(如 add_messages
)控制状态的更新方式。
这是因为:
LangGraph 在内部会显式地读取
State
类型定义,并通过反射(introspection)提取Annotated[...]
中的元信息,例如其中的add_messages
函数,并在运行时使用这个函数作为 reducer。
messages: Annotated[list, add_messages]
这个语句在语法层面只是类型标注,但 LangGraph 会执行如下逻辑:
- 扫描你提供的
TypedDict
类型 - 识别出被
Annotated
修饰的字段 - 提取
add_messages
函数(这是你提供的 reducer) - 在状态更新(state merge)时,调用该 reducer 控制该字段的合并方式
因此:
Annotated[list, add_messages]
≠list
,而是一个“有 reducer 的 list”- LangGraph 利用这个 reducer,实现了非破坏式更新(append 而不是 overwrite)
假设你没有加 add_messages
:
class State(TypedDict):messages: list # 默认行为:每次更新直接覆盖原有值
然后你某个节点返回:
return {"messages": ["hello"]}
→ 原有 messages 被整个替换掉了。
而如果你用了:
messages: Annotated[list, add_messages]
然后返回同样的数据:
return {"messages": ["hello"]}
→ LangGraph 会帮你把 "hello"
追加到原有的 messages 列表中。
Annotated&reducer
Python 提供了反射机制(introspection),可以在运行时访问类型注解。特别是,typing.Annotated
在 Python 3.9+ 中的行为可以通过 get_type_hints
+ __metadata__
来解析。
from typing import Annotated, get_type_hints
from typing_extensions import TypedDict# 假设这个是你的 reducer 函数
def add_messages(old, new):return old + newclass State(TypedDict):messages: Annotated[list, add_messages]# 使用反射来读取注解
hints = get_type_hints(State, include_extras=True)
print(hints)
输出:
{'messages': Annotated[list, <function add_messages at 0x...>]
}
你可以进一步解析这个 Annotated
:
from typing import get_args, get_origin, Annotatedann = hints['messages']
if get_origin(ann) is Annotated:base_type, *metadata = get_args(ann)print("Base type:", base_type) # listprint("Metadata (reducers):", metadata) # [add_messages]
LangGraph 内部就是用了类似的方法,在构建 StateGraph
时遍历所有字段的注解,把 reducer 信息存下来,然后在运行时合并状态时自动使用它。
什么是 reducer?它在 LangGraph 中的作用是什么?
Reducer 是一种用于聚合多个状态值的函数,它的输入是旧值和新值,输出是合并后的值。
典型形式:
new_state = reducer(old_state, new_update)
最常见的例子就是:
def add_messages(old, new):return old + new # 把两个列表连接起来
LangGraph 中就是用 reducer 控制状态的合并行为。
在状态状态图(StateGraph)中,每个节点都会对全局状态做局部更新。LangGraph 允许多个节点顺序更新同一个 key,那么它需要一种机制来决定多个更新如何组合 —— 这就是 reducer 的作用。
比如:
- 对于
messages
字段,你希望是“追加”→ 用add_messages
- 对于
count
字段,你可能希望是“覆盖”→ 默认就用 overwrite(无 reducer)
LangGraph 的默认行为是覆盖(overwrite),但你可以通过 Annotated 指定 reducer 改变这个行为。
第2步:添加一个节点(Node)
接下来,我们添加一个名为 "chatbot"
的节点。
节点是 LangGraph 中的工作单元(unit of work),通常是一个普通的 Python 函数。
首先选择一个聊天模型(chat model),以 OpenAI 为例:
pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_modelos.environ["OPENAI_API_KEY"] = "sk-..." # 替换为你的 API 密钥llm = init_chat_model("openai:gpt-4.1")
现在我们将该聊天模型封装进一个简单的节点函数中:
def chatbot(state: State):return {"messages": [llm.invoke(state["messages"])]}
然后将该函数注册为节点:
# 第一个参数是节点的唯一名称
# 第二个参数是函数或对象,每当该节点被调用时会执行它
graph_builder.add_node("chatbot", chatbot)
chatbot
函数以当前的State
作为输入,返回一个字典,包含更新后的messages
。- 这个结构是 所有 LangGraph 节点函数的基本模式。
- 此处的
add_messages
reducer 将确保返回的消息会被追加到已有的消息列表中。
这里如果没有模型可以去外部的一些平台去获取公共的模型能力:
import os
from openai import OpenAIos.environ["OPENAI_API_KEY"] = "xxx"
os.environ["OPENAI_BASE_URL"] = "https://api.siliconflow.cn/v1"
client = OpenAI()response = client.chat.completions.create(model="Qwen/Qwen3-8B",messages=[{"role": "user","content": "你是谁",}],temperature=0.7,max_tokens=4096
)
# message=ChatCompletionMessage(
# content='\n\n我是通义千问,是通义实验室开发的超大规模语言模型。
# 我能够帮助您回答问题、创作文字、编程、分析数据等,支持多语言交流。
# 我的训练数据截止到2024年,能够理解和生成各种类型的文本。无论是日常对话、学习工作,还是创意写作,我都可以为您提供帮助。
# 有什么问题或需要 assistance 的吗?',
# )
print(response)
第3步:添加入口节点(Entry Point)
添加一个入口节点,用于指明每次运行图时应从哪里开始执行:
graph_builder.add_edge(START, "chatbot")
第4步:编译图(Compile the Graph)
在运行图之前,需要先进行编译。调用 compile()
方法即可完成编译。这将生成一个 CompiledGraph
,你可以对其传入状态进行调用:
graph = graph_builder.compile()
第5步:可视化图结构(可选)
你可以使用 get_graph
方法配合各种 draw
方法(例如 draw_ascii
或 draw_png
)来可视化整个图结构。这些可视化方法依赖额外的依赖项。
# pip install ipython
from IPython.display import Image, displaytry:display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:# 这一步是可选的,运行失败通常是缺少一些依赖pass
注意这段代码,我们要在Jupyter中运行,
第6步: 运行聊天机器人
现在来运行你构建的聊天机器人吧!
def stream_graph_updates(user_input: str):for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):for value in event.values():print("Assistant:", value["messages"][-1].content)while True:try:user_input = input("User: ")if user_input.lower() in ["quit", "exit", "q"]:print("Goodbye!")breakstream_graph_updates(user_input)except:# 如果 input() 无法使用,使用备用输入user_input = "What do you know about LangGraph?"print("User: " + user_input)stream_graph_updates(user_input)break
输出示例:
Assistant: LangGraph 是一个用于构建具备状态管理的多智能体应用的库。它提供了创建工作流和状态机的工具,用于协调多个 AI 智能体或语言模型的交互。LangGraph 基于 LangChain 构建,利用其组件,同时加入了基于图的协调能力。它特别适合开发那些超越简单问答交互的复杂状态型 AI 应用。
Goodbye!
🎉 恭喜! 你已经使用 LangGraph 构建了第一个聊天机器人。这个机器人能够接受用户输入并使用 LLM 生成回复。
本案例所有代码如下:
import os
from langchain.chat_models import init_chat_model
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.graph.message import add_messagesos.environ["OPENAI_API_KEY"] = "your api key"
os.environ["OPENAI_BASE_URL"] = "https://api.siliconflow.cn/v1"
llm = init_chat_model("openai:Qwen/Qwen3-8B")class State(TypedDict):# messages 是一个列表类型。# 通过注解 `add_messages` 指定该键的更新方式:# 它会将新消息追加到现有的列表中,而不是覆盖。messages: Annotated[list, add_messages]def chatbot(state: State):return {"messages": [llm.invoke(state["messages"])]}# 创建状态图构建器
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()for event in graph.stream({"messages": [{"role": "user", "content": input(">>>")}]}):for value in event.values():print("Assistant:", value["messages"][-1].content)
Add tools
To handle queries you chatbot can’t answer “from memory”, integrate a web search tool. The chatbot can use this tool to find relevant information and provide better responses.
在开始本教程之前,请确保你具备以下条件:
- 一个 Tavily 搜索引擎的 API 密钥
pip install langchain-tavily
配置环境变量:
os.environ["TAVILY_API_KEY"] = "your key"
定义一个web search工具:
from langchain_tavily import TavilySearchtool = TavilySearch(max_results=2)
tools = [tool]
tool.invoke("What's a 'node' in LangGraph?")
这些结果是网页摘要,供我们的聊天机器人用来回答问题。
{'query': "What's a 'node' in LangGraph?",
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [{'title': "Introduction to LangGraph: A Beginner's Guide - Medium",
'url': 'https://medium.com/@cplog/introduction-to-langgraph-a-beginners-guide-14f9be027141',
'content': 'Stateful Graph: LangGraph revolves around the concept of a stateful graph, where each node in the graph represents a step in your computation, and the graph maintains a state that is passed around and updated as the computation progresses. LangGraph supports conditional edges, allowing you to dynamically determine the next node to execute based on the current state of the graph. We define nodes for classifying the input, handling greetings, and handling search queries. def classify_input_node(state): LangGraph is a versatile tool for building complex, stateful applications with LLMs. By understanding its core concepts and working through simple examples, beginners can start to leverage its power for their projects. Remember to pay attention to state management, conditional edges, and ensuring there are no dead-end nodes in your graph.',
'score': 0.7065353,
'raw_content': None},
{'title': 'LangGraph Tutorial: What Is LangGraph and How to Use It?',
'url': 'https://www.datacamp.com/tutorial/langgraph-tutorial',
'content': 'LangGraph is a library within the LangChain ecosystem that provides a framework for defining, coordinating, and executing multiple LLM agents (or chains) in a structured and efficient manner. By managing the flow of data and the sequence of operations, LangGraph allows developers to focus on the high-level logic of their applications rather than the intricacies of agent coordination. Whether you need a chatbot that can handle various types of user requests or a multi-agent system that performs complex tasks, LangGraph provides the tools to build exactly what you need. LangGraph significantly simplifies the development of complex LLM applications by providing a structured framework for managing state and coordinating agent interactions.',
'score': 0.5008063,
'raw_content': None}],
'response_time': 1.38}
在第一个教程中创建的 StateGraph
,在 LLM 上添加 bind_tools
。这会让 LLM 知道,如果它要使用搜索引擎,应使用哪种正确的 JSON 格式。
我们先选择要使用的 LLM:
from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messagesclass State(TypedDict):messages: Annotated[list, add_messages]graph_builder = StateGraph(State)# Modification: tell the LLM which tools it can call
# highlight-next-line
llm_with_tools = llm.bind_tools(tools)def chatbot(state: State):return {"messages": [llm_with_tools.invoke(state["messages"])]}graph_builder.add_node("chatbot", chatbot)
现在,创建一个函数来在工具被调用时执行它们。通过将工具添加到一个名为 BasicToolNode
的新节点中实现。该节点会检查状态中的最新消息,并在消息包含 tool_calls
时调用工具。这个过程依赖于 LLM 的工具调用(tool-calling)支持,目前在 Anthropic、OpenAI、Google Gemini 和其他一些 LLM 提供商中可用。
import json
from langchain_core.messages import ToolMessageclass BasicToolNode:"""一个在最后一条 AI 消息中运行请求的工具的节点。"""def __init__(self, tools: list) -> None:self.tools_by_name = {tool.name: tool for tool in tools}def __call__(self, inputs: dict):if messages := inputs.get("messages", []):message = messages[-1]else:raise ValueError("输入中未找到消息")outputs = []for tool_call in message.tool_calls:tool_result = self.tools_by_name[tool_call["name"]].invoke(tool_call["args"])outputs.append(ToolMessage(content=json.dumps(tool_result),name=tool_call["name"],tool_call_id=tool_call["id"],))return {"messages": outputs}tool_node = BasicToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)
💡 注意
如果你以后不想自己实现这个功能,可以使用 LangGraph 提供的预构建工具节点ToolNode
。
添加完工具节点后,现在可以定义 条件边。
边用于将控制流从一个节点路由到下一个节点。条件边从一个节点出发,通常包含 if
判断,根据当前图的状态路由到不同的节点。这些函数接收当前状态并返回一个字符串或字符串列表,用于指示下一个要调用的节点名。
接下来定义一个名为 route_tools
的路由函数,该函数检查 chatbot
输出中是否存在 tool_calls
。通过 add_conditional_edges
将这个函数添加到图中,告诉图在 chatbot
节点完成后调用此函数来判断下一步的跳转。
如果存在工具调用,则跳转到 tools
;否则跳转到 END
。因为该条件函数可能返回 END
,所以这次不需要显式设置终点节点。
def route_tools(state: State):"""在条件边中使用:如果最后一条消息包含工具调用,则路由到 ToolNode,否则路由到 END。"""if isinstance(state, list):ai_message = state[-1]elif messages := state.get("messages", []):ai_message = messages[-1]else:raise ValueError(f"tool_edge 中输入状态无消息: {state}")if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:return "tools"return END# 如果 chatbot 请求使用工具,返回 "tools",否则返回 "END"
# 这个条件路由构成了代理的主循环逻辑
graph_builder.add_conditional_edges("chatbot",route_tools,{"tools": "tools",END: END,}
)# 每次调用工具后,返回 chatbot 节点决定下一步
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()
你可以使用预构建的 tools_condition 来替代上述条件判断函数,使代码更简洁。
你可以使用 get_graph
方法和一种绘图方法(如 draw_ascii
或 draw_png
)来可视化图结构。这些绘图方法需要额外依赖。
from IPython.display import Image, displaytry:display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:# 可选功能,缺少依赖时忽略pass
现在,你可以向 chatbot 提问它训练数据之外的问题:
def stream_graph_updates(user_input: str):for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):for value in event.values():print("Assistant:", value["messages"][-1].content)while True:try:user_input = input("User: ")if user_input.lower() in ["quit", "exit", "q"]:print("Goodbye!")breakstream_graph_updates(user_input)except:# 如果 input() 无法使用,使用备用输入user_input = "What do you know about LangGraph?"print("User: " + user_input)stream_graph_updates(user_input)break
输出示例:
Assistant: [{'text': "为了为你提供关于 LangGraph 的准确信息,我需要搜索最新内容。让我来为你查找。", 'type': 'text'}, {'id': 'toolu_01Q588CszHaSvvP2MxRq9zRD', 'input': {'query': 'LangGraph AI tool information'}, 'name': 'tavily_search_results_json', 'type': 'tool_use'}]
Assistant: [{"url": "https://www.langchain.com/langgraph", "content": "LangGraph 为我们构建和扩展 AI 工作负载奠定了基础——从对话代理、复杂任务自动化,到“开箱即用”的自定义 LLM 体验。..."}, {"url": "https://github.com/langchain-ai/langgraph", "content": "LangGraph 是一个用于构建具有状态、多参与者的 LLM 应用的库..."}]
Assistant: 基于搜索结果,我可以为你提供以下 LangGraph 的信息:1. **用途**:LangGraph 是一个用于构建具有状态和多参与者(multi-actor)的大语言模型(LLM)应用的库,特别适合用于创建代理和多代理工作流。2. **开发者**:LangGraph 由 LangChain 开发,LangChain 是一个专注于 AI 和 LLM 工具的公司。3. **主要特性**:- **循环支持**:允许定义包含循环的流程,这对大多数代理架构至关重要。- **可控性**:增强对应用流程的控制。- **持久性**:支持状态维护和持久化。4. **应用场景**:- 对话代理- 复杂任务自动化- 定制的 LLM 支持体验5. **集成**:LangGraph 可与 LangChain 的另一个工具 LangSmith 搭配使用,提供构建复杂、可投入生产的 LLM 应用的完整解决方案。6. **意义**:LangGraph 相比其他框架,在处理循环、控制性和状态维护方面具有独特优势。
为了更方便使用,你可以将代码替换为 LangGraph 提供的预构建组件。这些组件内置了一些功能,例如并行调用 API。
- 用预构建的
ToolNode
替换BasicToolNode
- 用预构建的
tools_condition
替换route_tools
import os
from langchain.chat_models import init_chat_modelos.environ["OPENAI_API_KEY"] = "sk-..."llm = init_chat_model("openai:gpt-4.1")from typing import Annotated
from langchain_tavily import TavilySearch
from langchain_core.messages import BaseMessage
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_conditionclass State(TypedDict):messages: Annotated[list, add_messages]graph_builder = StateGraph(State)tool = TavilySearch(max_results=2)
tools = [tool]
llm_with_tools = llm.bind_tools(tools)def chatbot(state: State):return {"messages": [llm_with_tools.invoke(state["messages"])]}graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)graph_builder.add_conditional_edges("chatbot", tools_condition)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")graph = graph_builder.compile()
🎉 你已经使用 LangGraph 构建了一个可以调用搜索引擎的对话代理。它现在可以处理更多样化的用户问题。
现在,我们这样做:
# 创建状态图构建器
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)
# 如果 chatbot 请求使用工具,返回 "tools",否则返回 "END"
# 这个条件路由构成了代理的主循环逻辑
graph_builder.add_conditional_edges("chatbot", tools_condition)
# 每次调用工具后,返回 chatbot 节点决定下一步
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()for event in graph.stream({"messages": [{"role": "user", "content": input(">>>")}]}):for value in event.values():print("Assistant:", value["messages"][-1].content)
点击进去tools_condition
,发现他和我们的tool_router做的是一样的工作:
if isinstance(state, list):ai_message = state[-1]elif isinstance(state, dict) and (messages := state.get(messages_key, [])):ai_message = messages[-1]elif messages := getattr(state, messages_key, []):ai_message = messages[-1]else:raise ValueError(f"No messages found in input state to tool_edge: {state}")if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:return "tools"return "__end__"
检查llm最后一条返回值,如果里面有tool_calls则返回"tools",而"tools"则是ToolNode内置的name,也就是把请求导向了ToolNode,
def __init__(self,tools: Sequence[Union[BaseTool, Callable]],*,name: str = "tools",tags: Optional[list[str]] = None,handle_tool_errors: Union[bool, str, Callable[..., str], tuple[type[Exception], ...]] = True,messages_key: str = "messages",) -> None:
然后invoke或者ainvoke触发ToolNode启动线程池或eventLoop去处理工具调用,再将工具调用信息追加到状态机返回。
Add memory
目前,chatbot 已经可以使用工具来回答用户问题,但它无法记住之前对话的上下文,这限制了它进行连贯、多轮对话的能力。
LangGraph 通过持久化检查点(persistent checkpointing)机制解决了这个问题。如果你在编译图时提供了一个 checkpointer
,并在调用图时提供了一个 thread_id
,LangGraph 会在每一步之后自动保存状态。当你再次使用相同的 thread_id
调用图时,它会加载之前保存的状态,从而让 chatbot 能够从上次对话的上下文继续。
稍后你会看到,checkpointing 的功能远比简单的聊天记忆更强大。它不仅能保存和恢复复杂的状态,还支持诸如错误恢复、人类介入工作流(human-in-the-loop)、时间旅行式交互等高级场景。但我们首先从实现多轮对话的 checkpointing 开始。
先创建一个 MemorySaver
类型的检查点保存器(checkpointer):
from langgraph.checkpoint.memory import MemorySavermemory = MemorySaver()
这是一种基于内存的 checkpointer,非常适合教程演示使用。但在生产环境中,你更可能使用 SqliteSaver
或 PostgresSaver
并连接到一个数据库,以实现持久化存储。
使用提供的 checkpointer
编译图,这样图在执行每个节点时会自动保存状态(State):
graph = graph_builder.compile(checkpointer=memory)
可选:可视化图结构(依赖额外库):
from IPython.display import Image, displaytry:display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:# 这依赖一些额外组件,非必需pass
现在你可以开始与 chatbot 交互了!
选择一个线程 ID 作为这次对话的标识符:
config = {"configurable": {"thread_id": "1"}}
调用 chatbot:
user_input = "Hi there! My name is Will."# 注意:config 是 stream() 或 invoke() 的**第二个位置参数**
events = graph.stream({"messages": [{"role": "user", "content": user_input}]},config,stream_mode="values",
)for event in events:event["messages"][-1].pretty_print()
输出:
================================ Human Message =================================
Hi there! My name is Will.
================================== Ai Message ==================================
Hello Will! It's nice to meet you. How can I assist you today? Is there anything specific you'd like to know or discuss?
💡 注意:
config 是调用图时的第二个位置参数,而不是包含在 graph 输入的{'messages': []}
内部。
继续提问:
user_input = "Remember my name?"events = graph.stream({"messages": [{"role": "user", "content": user_input}]},config,stream_mode="values",
)for event in events:event["messages"][-1].pretty_print()
输出:
================================ Human Message =================================
Remember my name?
================================== Ai Message ==================================
Of course, I remember your name, Will. I always try to pay attention to important details that users share with me. Is there anything else you'd like to talk about or any questions you have? I'm here to help with a wide range of topics or tasks.
注意我们没有使用外部变量来保存上下文信息:全部由 checkpointer
自动管理!
换一个 thread_id 试试
我们只改变 thread_id
为 “2”:
events = graph.stream({"messages": [{"role": "user", "content": user_input}]},{"configurable": {"thread_id": "2"}},stream_mode="values",
)for event in events:event["messages"][-1].pretty_print()
输出:
================================ Human Message =================================
Remember my name?
================================== Ai Message ==================================
I apologize, but I don't have any previous context or memory of your name. As an AI assistant, I don't retain information from past conversations. Each interaction starts fresh. Could you please tell me your name so I can address you properly in this conversation?
我们唯一的更改就是把 thread_id
从 “1” 改为了 “2”。
查看状态快照
现在我们已经在两个不同的线程中创建了多个检查点。那么,一个检查点里到底包含了什么?
你可以通过调用 get_state(config)
来查看图在某个配置下的状态:
snapshot = graph.get_state(config)
snapshot
示例输出包括:
- 所有
messages
历史 - 对应的
config
信息 - 当前执行状态(
next
) - 执行元数据(如使用的模型、token 数等)
如果图已结束,next
会是空的:
snapshot.next # 空表示已经执行完
🎉 恭喜!你的 chatbot 现在已经能够跨会话保持对话状态,这要归功于 LangGraph 的 checkpointing 系统。它不仅支持自然、具上下文的对话交互,还能应对复杂状态恢复,比起传统聊天记忆更强大灵活。
上文所有代码:
memory = MemorySaver()# 创建状态图构建器
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)
# 如果 chatbot 请求使用工具,返回 "tools",否则返回 "END"
# 这个条件路由构成了代理的主循环逻辑
graph_builder.add_conditional_edges("chatbot", tools_condition)
# 每次调用工具后,返回 chatbot 节点决定下一步
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile(checkpointer=memory)config = {"configurable": {"thread_id": "1"}}while True:user: str = input(">>>")for event in graph.stream({"messages": [{"role": "user", "content": user}]},config,stream_mode="values",):event["messages"][-1].pretty_print()
这里需要注意,stream_mode="values"
和 不加 stream_mode
(即使用默认模式) 是有区别的,它们影响了你从 graph.stream()
或 graph.invoke()
得到的输出形式和语义:
✅ stream_mode="values"
- 返回的是每个节点处理后的“State 值”(如
{"messages": [...]}
) - 适合直接处理用户关心的内容,例如你只关心最终生成的聊天消息内容。
- 每次迭代时返回的是一个字典,代表当前状态值。
for event in graph.stream(inputs, config, stream_mode="values"):print(event) # event 是 dict,如 {"messages": [...]}
❌ 不加 stream_mode
(默认行为)
-
默认
stream_mode=None
,返回的是完整的 执行事件(execution event),包含:- 执行的节点名(
"node_name"
) - 当前状态值(
"output"
) - 上下文信息(如 metadata、timing、errors 等)
- 执行的节点名(
-
更适合用于调试、日志记录、监控流程图执行过程
for event in graph.stream(inputs, config): # 默认模式print(event)# event 是形如 {"node_name": ..., "output": ..., "metadata": ...} 的结构体
示例:
# 假设 chatbot 返回 {"messages": [...]}# values 模式
event = {"messages": [...]} # 更像是普通的状态快照# 默认模式
event = {"node_name": "chatbot","output": {"messages": [...]},"metadata": {...}
}
其次,在生产环境中,需要将 thread_id
动态地管理。
用用户ID + 会话ID 拼成 thread_id
LangGraph 的状态管理是以 thread_id
为主键进行持久化的。因此,只要每次请求传入相同的 thread_id
,就能恢复这个用户对应的对话上下文。
# 假设你有这些信息:
user_id = "user_abc123"
session_id = "sess_001"# 构造唯一 thread_id(注意不能冲突)
thread_id = f"{user_id}:{session_id}"# 配置传入
config = {"configurable": {"thread_id": thread_id}}# 每次调用都传这个 config
graph.stream({"messages": [{"role": "user", "content": "Hello"}]}, config)
用 UUID 或 数据库自动生成的 thread_id
如果你要为每个新会话动态生成一个新的对话线程,可用随机生成或数据库分配:
import uuid# 新会话生成新线程ID
new_thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": new_thread_id}}
你需要把这个 thread_id
保存到数据库或者客户端 cookie/session 中,后续用户请求时带回来,才能恢复状态。
用数据库或缓存管理用户会话
建议你在服务端有个简单的 session 数据表:
user_id | session_id | thread_id | created_at |
---|---|---|---|
user_abc | sess_001 | user_abc:sess_001 | 2025-05-25 10:00 |
user_abc | sess_002 | user_abc:sess_002 | 2025-05-25 10:10 |
这样你可以:
- 给每次用户新会话生成 thread_id
- 用户每次发消息,前端带上 session_id 或 thread_id
- 后端就能用它从 checkpoint 恢复上下文
搭配持久化存储(MemorySaver → Sqlite/Postgres)
内存存储 MemorySaver
仅适合调试或开发。生产中应换成持久化存储:
from langgraph.checkpoint.sqlite import SqliteSavercheckpointer = SqliteSaver.from_path("checkpoints.db")
graph = graph_builder.compile(checkpointer=checkpointer)
或者 PostgreSQL:
from langgraph.checkpoint.postgres import PostgresSavercheckpointer = PostgresSaver.from_conn_string("postgresql://user:pass@localhost/dbname")
这样 thread_id
对应的状态会持久化在数据库中,支持服务重启、分布式部署等场景。
Add human-in-the-loop controls
代理可能并不总是可靠,可能需要人为输入才能成功完成任务。类似地,对于某些操作,你可能希望在执行前要求人工审批,以确保一切按预期运行。
LangGraph 的持久化层支持“人类参与”(human-in-the-loop)工作流,允许根据用户反馈暂停和恢复执行。该功能的主要接口是 interrupt
函数。在一个节点内部调用 interrupt
会暂停执行。随后可以通过传入一个 Command
来恢复执行,并携带来自人类的新输入。interrupt
的用法在操作上类似于 Python 内置的 input()
,但存在一些注意事项。
在“为聊天机器人添加记忆”教程的现有代码基础上,向聊天机器人中添加 human_assistance
工具。该工具使用 interrupt
来接收来自人类的信息。
from typing import Annotatedfrom langchain_tavily import TavilySearch
from langchain_core.tools import tool
from typing_extensions import TypedDictfrom langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_conditionfrom langgraph.types import Command, interruptclass State(TypedDict):messages: Annotated[list, add_messages]graph_builder = StateGraph(State)@tool
def human_assistance(query: str) -> str:"""Request assistance from a human."""human_response = interrupt({"query": query})return human_response["data"]tool = TavilySearch(max_results=2)
tools = [tool, human_assistance]
llm_with_tools = llm.bind_tools(tools)def chatbot(state: State):message = llm_with_tools.invoke(state["messages"])# Because we will be interrupting during tool execution,# we disable parallel tool calling to avoid repeating any# tool invocations when we resume.assert len(message.tool_calls) <= 1return {"messages": [message]}graph_builder.add_node("chatbot", chatbot)tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)graph_builder.add_conditional_edges("chatbot",tools_condition,
)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
像之前一样,我们使用 checkpointer 编译图:
memory = MemorySaver()graph = graph_builder.compile(checkpointer=memory)
现在,向聊天机器人输入一个问题,使其触发 human_assistance
工具:
user_input = "我需要一些关于构建 AI 代理的专家指导,你能为我请求帮助吗?"
config = {"configurable": {"thread_id": "1"}}events = graph.stream({"messages": [{"role": "user", "content": user_input}]},config,stream_mode="values",
)for event in events:if "messages" in event:event["messages"][-1].pretty_print()
输出内容如下:
================================ 用户消息 =================================我需要一些关于构建 AI 代理的专家指导,你能为我请求帮助吗?================================ AI 回复 ==================================[{"text": "当然可以!我很乐意为您请求专家协助。为此,我将使用 human_assistance 工具转达您的请求,现在就为您操作。","type": "text"},{"id": "toolu_01ABUqneqnuHNuo1vhfDFQCW","input": {"query": "一位用户请求关于构建 AI 代理的专家指导。您能否就此主题提供一些专业建议或资源?"},"name": "human_assistance","type": "tool_use"}
]
工具调用信息:
调用工具:human_assistance (toolu_01ABUqneqnuHNuo1vhfDFQCW)
参数:query: 一位用户请求关于构建 AI 代理的专家指导。您能否就此主题提供一些专业建议或资源?
此时,聊天机器人已生成工具调用,但执行被中断。查看图的状态,会发现其停在了 tools
节点:
snapshot = graph.get_state(config)
snapshot.next
# 输出: ('tools',)
类似于 Python 的 input()
函数,在工具内部调用 interrupt()
会暂停执行。进度将通过 checkpointer 保持;因此如果使用的是 PostgreSQL 持久化存储,只要数据库存活就可以随时恢复。本例中使用的是内存持久化方式(MemorySaver),在 Python 内核运行期间可以随时恢复。
为了恢复执行,传入一个包含工具所期望数据的 Command
对象。数据格式可按需自定义;本例中使用包含 "data"
键的字典:
human_response = ("我们专家团队很乐意提供帮助!我们建议您查看 LangGraph,它比简单的自治代理更加可靠且可扩展。"
)human_command = Command(resume={"data": human_response})events = graph.stream(human_command, config, stream_mode="values")for event in events:if "messages" in event:event["messages"][-1].pretty_print()
输出内容如下:
================================ AI 回复 ==================================当然可以!我很乐意为您请求专家协助。为此,我将使用 human_assistance 工具转达您的请求,现在就为您操作。调用工具:human_assistance
参数:query: 一位用户请求关于构建 AI 代理的专家指导。您能否就此主题提供一些专业建议或资源?================================ 工具回复 =================================我们专家团队很乐意提供帮助!我们建议您查看 LangGraph,它比简单的自治代理更加可靠且可扩展。================================ AI 回复 =================================感谢您的耐心等待。我已收到专家关于构建 AI 代理的指导建议。以下是他们的推荐:专家建议您查看 LangGraph 来构建 AI 代理。他们表示,LangGraph 比简单的自治代理更可靠、可扩展。LangGraph 可能是一个专门用于构建 AI 代理的框架或库,具备以下优势:1. **可靠性**:比起简单的代理方法,LangGraph 更加稳定,可能具有更强的错误处理能力或一致性表现。2. **可扩展性**:架构灵活,易于添加新功能或根据需要调整现有逻辑。3. **先进能力**:相较于“简单的自治代理”,LangGraph 可能提供更复杂的工具与构建能力。建议您:
- 查找专门面向 LangGraph 的教程或指南;
- 加入社区论坛,与其他开发者交流。如果您还想了解更详细的信息,请告诉我,我可以继续请求专家协助。
✅ 你已成功使用
interrupt
将人类协作功能集成到你的聊天机器人中!
这使得你可以在需要时引入人工干预与审批。这也为你创建 AI 系统的 UI 打开了更多可能。
由于已经添加了 checkpointer,只要底层持久化层运行中,图可以无限期暂停并随时恢复,就像什么都没发生过一样。
本章示例代码:
searcher = TavilySearch(max_results=2)@tool
def human_assistance(query: str) -> str:"""Request assistance from a human."""human_response = interrupt({"query": query})return human_response["data"]tools = [searcher, human_assistance]
llm = init_chat_model("openai:Qwen/Qwen3-8B").bind_tools(tools)class State(TypedDict):# messages 是一个列表类型。# 通过注解 `add_messages` 指定该键的更新方式:# 它会将新消息追加到现有的列表中,而不是覆盖。messages: Annotated[list, add_messages]def route_tools(state: State):"""在条件边中使用:如果最后一条消息包含工具调用,则路由到 ToolNode,否则路由到 END。"""if isinstance(state, list):ai_message = state[-1]elif messages := state.get("messages", []):ai_message = messages[-1]else:raise ValueError(f"tool_edge 中输入状态无消息: {state}")if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:return "tools"return ENDdef chatbot(state: State):return {"messages": [llm.invoke(state["messages"])]}memory = MemorySaver()# 创建状态图构建器
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)
# 如果 chatbot 请求使用工具,返回 "tools",否则返回 "END"
# 这个条件路由构成了代理的主循环逻辑
graph_builder.add_conditional_edges("chatbot", tools_condition)
# 每次调用工具后,返回 chatbot 节点决定下一步
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile(checkpointer=memory)config = {"configurable": {"thread_id": "1"}}while True:user_input = input(">>>")# 判断是否是 resume 响应,比如用户输入以 "!人工:" 开头if user_input.startswith("!人工"):human_reply = user_input.removeprefix("!人工")human_command = Command(resume={"data": "我们专家团队很乐意提供帮助!我们建议您查看 LangGraph,它比简单的自治代理更加可靠且可扩展。"})stream_input = human_commandelse:stream_input = {"messages": [{"role": "user", "content": user_input}]}for event in graph.stream(stream_input, config, stream_mode="values"):if "messages" in event:event["messages"][-1].pretty_print()
这里比较难理解的是Command和interrupt的配合:
interrupt({"query": query})
这里传入的 {"query": query}
是 中断上下文信息 —— 用来告诉外部系统 “我为什么要中断?”、“我需要你人工帮我处理什么?”
你是“人类操作员”,你看到的会是:
{"query": "用户正在请求构建 AI agent 的专家建议"
}
你可以把它想象成:interrupt({"query": query})
就像前端弹窗里的一段提示话术:
❓ 用户请求构建 AI Agent 的专家指导,请输入建议内容。
然后人类填了一句建议点“提交” → resume → LangGraph 恢复执行。
至于, human_assistance(query: str)
的入参是怎么来的?
它的入参 query
是由 LLM 的工具调用生成的。也就是说,当大语言模型判断“需要调用 human_assistance
工具”时,它会自动构造这个函数的入参。
假设你让机器人处理这个用户输入:
user_input = "我需要一些专家建议来构建 AI agent"
大语言模型(如 GPT-4)会自动构造如下 工具调用(tool call):
{"name": "human_assistance","arguments": {"query": "我需要一些专家建议来构建 AI agent"}
}
此时,LangGraph 调用你注册的工具:
@tool
def human_assistance(query: str) -> str:...
框架自动把 "arguments"
中的 "query"
这个字段取出来,传进了你的函数 query: str
。
至于:
human_response = interrupt({"query": query})
return human_response["data"]
和
Command(resume={"data": "我们专家团队很乐意提供帮助!我们建议您查看 LangGraph,它比简单的自治代理更加可靠且可扩展。"})
相当于human_response =input("显示的内容")
,human_response 的内容是Command传输的resume字典!
我们的调试程序是这样的:
while True:user_input = input(">>>")# 判断是否是 resume 响应,比如用户输入以 "!人工:" 开头if user_input.startswith("!人工"):human_reply = user_input.removeprefix("!人工")human_command = Command(resume={"data": "我们专家团队很乐意提供帮助!我们建议您查看 LangGraph,它比简单的自治代理更加可靠且可扩展。"})stream_input = human_commandelse:stream_input = {"messages": [{"role": "user", "content": user_input}]}for event in graph.stream(stream_input, config, stream_mode="values"):if "messages" in event:event["messages"][-1].pretty_print()
当程序遇到interrupt时,for event循环打印出提示信息后进入下一次输入,这时候因为程序要求你输入一个Command,那就应该将input改为Command,当然你也可以继续输入普通内容:
而你触发Command时,对话会变成这样:
human-in-the-loop controls的意义
Human-in-the-loop (HITL) 是指:
在自动化系统(如 Agent)运行过程中,保留人类参与决策的能力。
在 LangGraph 中,它体现在你可以使用 interrupt(...)
主动中断代理流程,并等待 人工输入/决策后再恢复执行。
为什么需要 HITL?
现实中,完全自治的 Agent 容易出错。HITL 提供了一个机制,让系统“学会反问”或“等待人工干预”,从而:
- 提高 可靠性(避免 AI 做出不当操作)
- 保证 安全性(在关键节点需人类审批)
- 增强 可控性(人工随时插手/修正 agent 的行为)
举例:实际应用场景
✅ 1. 自动回复客服系统
- 用户投诉某产品功能问题;
- Agent 回答不确定是否属于严重 bug,调用
human_assistance(query=...)
; - 人工客服介入并提供专业回复;
- Agent 将人工回复加入对话继续处理后续问题。
✅ 2. 内部问答机器人(如公司知识库)
- 用户提问:“公司今年的财报中哪些指标下降了?”
- LLM 模型查不到答案、或结果可能涉及敏感信息;
- 自动触发
interrupt(...)
请求人工审核; - 审核员查看上下文后输入答复,再继续对话。
✅ 3. Agent 工作流执行系统
比如自动化执行某些流程(更新配置、生成代码、操作数据库):
- Agent 在执行某条关键命令(如:删除表)前触发中断;
- 提示:“是否确认执行该操作?”
- 人工点击确认或拒绝后再继续执行。
✅ 4. 训练与评估阶段辅助标注
- Agent 自动处理 NLP 或图像任务;
- 遇到不确定的标签请求人工打标;
- 人类标注的结果可反馈给模型微调。
有些人可能理解为“AI反问用户、等待用户答复”,而“AI反问用户、等待用户答复”其实是 正常对话流程(模型自动生成消息、用户回复) ,interrupt()
是一种更强的、流程级别的“打断执行”机制,适用于:
✅ 引入第三方人工输入
✅ 需要人为决策或审查
✅ 流程不能继续,必须等待外部指令
类型 | 描述 |
---|---|
✅ 普通对话反问 | 模型主动说:“你想了解哪方面?”;用户回复;模型继续处理。 |
✅ interrupt() 中断控制 | 模型不再自己回复,而是“停住”、发出请求等待外部(人类)输入决策。 |
就像这样:
👤 用户问:“给我部署下线上服务”
🤖 模型判断这个请求比较敏感,不能随便答应
⛔ 它不直接答复你、也不继续处理,而是执行:
interrupt({"query": "用户请求部署服务,是否允许?"})
这时你可以:
- 在另一个界面看到这个请求(比如 web 审批界面)
- 人工填入:“允许” 或 “拒绝”
- LangGraph 将人类的输入作为
resume={"data": ...}
传回,继续流程
AI 反问的实现不需要 interrupt()
,只要让模型生成“反问”的 prompt 即可:
messages = [{"role": "user", "content": "langGraph怎么样?"},{"role": "assistant", "content": "你想了解它的哪一方面?性能、扩展性、还是上手体验?"},{"role": "user", "content": "我想知道框架性能和评分"},...
]
这套流程是 用户 ↔ 模型自洽对话;而 interrupt()
是插入了 第三方“人类审批” 的逻辑。
下面给出一些实际场景:
用户在网页联系客服,AI 自动初步回答问题,但遇到模糊或高风险问题时中断,请人类客服接入。
实际案例:京东/淘宝/Apple 支持网页客服系统
用户对话:
用户:我升级后手机频繁死机,怎么回事?
AI:这个问题可能属于系统兼容问题,是否需要我们请专家进一步确认?
[👉 请人工客服接入](按钮)
LangGraph 实现思路:
@tool
def human_assistance(query: str) -> str:"""Request human support."""return interrupt({"query": query})["data"]# 触发中断时:
human_response = interrupt({"query": "用户反馈 iOS 升级后设备死机"})
# 人类客服在界面上输入答复后返回:
# { "data": "您好,我们确认这是 iOS 17.2 的已知问题,建议恢复出厂设置..." }
前端页面(客服后台):
- 会弹出一个人工协助请求面板,显示用户上下文;
- 人工客服输入答复后点“提交”,LangGraph 会 resume 执行流程。
企业内部员工向 AI 问公司战略、业务数据、HR 政策等,但某些内容模型不能随便回答,需人工审核。
实际产品参考: Slack + LangChain 实现的内部问答机器人
交互示意:
员工:今年Q1我们公司的利润同比下降了吗?
AI:这个问题涉及财务数据,我将请求一位负责人提供权威答复。
[👉 通知财务团队]
LangGraph 代码概念:
@tool
def human_review(query: str) -> str:return interrupt({"query": query})["data"]
前端页面:
- 财务审核员在 Slack 或内部审批系统看到“审批请求”;
- 填写答复(或拒绝)后继续推进对话流程。
综上所述,interrupt(...)
本质上就是一个特殊的“工具调用(Tool Call)”,但这个工具不是自动执行的,而是专门设计给“人类介入”的!
Customize state
本章节,我们将为状态(state)添加额外的字段,以定义更复杂的行为,而不是仅依赖消息列表。聊天机器人将使用其搜索工具查找特定信息,并将其提交给人工审核。
通过在状态中添加 name
和 birthday
字段来更新聊天机器人,实现查找实体生日的功能:
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messagesclass State(TypedDict):messages: Annotated[list, add_messages]name: strbirthday: str
将这些信息添加到状态中,可以让图中其他节点(比如后续存储或处理信息的节点)轻松访问这些字段,同时也便于图的持久化层访问。
接下来,在 human_assistance
工具中填充状态字段。在将信息存入状态之前,请人工审核。使用 Command
对象在工具中发起状态更新:
from langchain_core.messages import ToolMessage
from langchain_core.tools import InjectedToolCallId, tool
from langgraph.types import Command, interrupt@tool
def human_assistance(name: str, birthday: str, tool_call_id: Annotated[str, InjectedToolCallId]
) -> str:"""Request assistance from a human."""human_response = interrupt({"question": "这些信息正确吗?","name": name,"birthday": birthday,},)if human_response.get("correct", "").lower().startswith("y"):verified_name = nameverified_birthday = birthdayresponse = "Correct"else:verified_name = human_response.get("name", name)verified_birthday = human_response.get("birthday", birthday)response = f"Made a correction: {human_response}"state_update = {"name": verified_name,"birthday": verified_birthday,"messages": [ToolMessage(response, tool_call_id=tool_call_id)],}return Command(update=state_update)
图的其他部分保持不变。
向聊天机器人提问,让机器人查找 LangGraph 库的“发布日期”,并在查找完成后调用 human_assistance
工具进行人工审核。通过设置工具的参数 name
和 birthday
,引导模型生成这些字段的候选值:
user_input = ("Can you look up when LangGraph was released? ""When you have the answer, use the human_assistance tool for review."
)
config = {"configurable": {"thread_id": "1"}}events = graph.stream({"messages": [{"role": "user", "content": user_input}]},config,stream_mode="values",
)
for event in events:if "messages" in event:event["messages"][-1].pretty_print()
输出显示模型先使用 tavily_search_results_json
搜索,然后调用 human_assistance
工具请求审核。
如果聊天机器人未能查到正确日期,你可以手动提供:
human_command = Command(resume={"name": "LangGraph","birthday": "Jan 17, 2024",},
)events = graph.stream(human_command, config, stream_mode="values")
for event in events:if "messages" in event:event["messages"][-1].pretty_print()
机器人将更新状态并输出最终信息:
LangGraph was initially released on January 17, 2024...
你可以验证状态已更新:
snapshot = graph.get_state(config)
{k: v for k, v in snapshot.values.items() if k in ("name", "birthday")}
# 输出:{'name': 'LangGraph', 'birthday': 'Jan 17, 2024'}
此外,LangGraph 允许你在任何时刻手动更新状态,包括中断期间:
graph.update_state(config, {"name": "LangGraph (library)"})
你可以再次查看状态变化:
snapshot = graph.get_state(config)
{k: v for k, v in snapshot.values.items() if k in ("name", "birthday")}
# 输出:{'name': 'LangGraph (library)', 'birthday': 'Jan 17, 2024'}
手动状态更新会在 LangSmith 中留下 trace,如果愿意,也可以用于控制人工审核流程。不过,通常推荐使用 interrupt
函数,它能更清晰地区分数据交互与状态更新逻辑。
示例代码:
searcher = TavilySearch(max_results=2)@tool
def human_assistance(name: str, birthday: str, tool_call_id: Annotated[str, InjectedToolCallId]
) -> str:"""Request assistance from a human."""human_response = interrupt({"question": "这些信息正确吗?","name": name,"birthday": birthday,},)if human_response.get("correct", "").lower().startswith("y"):verified_name = nameverified_birthday = birthdayresponse = "Correct"else:verified_name = human_response.get("name", name)verified_birthday = human_response.get("birthday", birthday)response = f"Made a correction: {human_response}"state_update = {"name": verified_name,"birthday": verified_birthday,"messages": [ToolMessage(response, tool_call_id=tool_call_id)],}return Command(update=state_update)tools = [human_assistance]
llm = init_chat_model("openai:Qwen/Qwen3-8B").bind_tools(tools)class State(TypedDict):# messages 是一个列表类型。# 通过注解 `add_messages` 指定该键的更新方式:# 它会将新消息追加到现有的列表中,而不是覆盖。messages: Annotated[list, add_messages]# 用户信息name: strbirthday: strdef route_tools(state: State):"""在条件边中使用:如果最后一条消息包含工具调用,则路由到 ToolNode,否则路由到 END。"""if isinstance(state, list):ai_message = state[-1]elif messages := state.get("messages", []):ai_message = messages[-1]else:raise ValueError(f"tool_edge 中输入状态无消息: {state}")if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:return "tools"return ENDdef chatbot(state: State):return {"messages": [llm.invoke(state["messages"])]}memory = MemorySaver()# 创建状态图构建器
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)
# 如果 chatbot 请求使用工具,返回 "tools",否则返回 "END"
# 这个条件路由构成了代理的主循环逻辑
graph_builder.add_conditional_edges("chatbot", tools_condition)
# 每次调用工具后,返回 chatbot 节点决定下一步
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile(checkpointer=memory)config = {"configurable": {"thread_id": "1"}}while True:user_input = input(">>>")if user_input.lower() in ["quit", "exit", "q"]:print("Goodbye!")break# 判断是否是 resume 响应,比如用户输入以 "!人工:" 开头if user_input.startswith("!人工"):human_reply = user_input.removeprefix("!人工")human_command = Command(resume={"name": "LangGraph","birthday": "Jan 17, 2024",},)stream_input = human_commandelse:stream_input = {"messages": [{"role": "user", "content": user_input}]}for event in graph.stream(stream_input, config, stream_mode="values"):if "messages" in event:event["messages"][-1].pretty_print()snapshot = graph.get_state(config)print({k: v for k, v in snapshot.values.items() if k in ("name", "birthday")})# 输出:{'name': 'LangGraph', 'birthday': 'Jan 17, 2024'}
自定义状态的意义远不只是“改内容”那么简单,它带来的核心价值包括:
让状态结构显式化(结构化 memory)
默认的 LangGraph 状态(messages
)是对话消息列表。如果你不自定义状态,那整个系统只能靠消息上下文推理。例如:
{"messages": [{"role": "user", "content": "LangGraph 是什么时候发布的?"}]}
系统并不知道你想追踪 name
和 birthday
—— 它只是一堆文本。
而通过添加结构化字段(比如 name
/ birthday
),LangGraph 里的节点就可以“以编程方式”访问这些字段,避免反复解析 messages 内容。
支持复杂流程编排(Workflow Composition)
结构化状态的最大价值是支持工作流中的自动决策与跳转。比如:
- 如果
birthday
字段为空,就走search
分支; - 如果
birthday
有值,但verified
是 False,就走human_assistance
; - 如果验证完毕,就走
store_to_db
。
这些流程的条件判断,无法通过仅仅依赖对话历史完成,而必须通过显式的字段值控制。
状态字段可用于持久化、恢复、中断点追踪
LangGraph 本质是个「状态机」+「事件流」,每个中间状态都可以 checkpoint:
snapshot = graph.get_state(config)
graph.update_state(config, {"verified": True})
这意味着:
- 你可以在某个流程中断处保存状态
- 之后从这个状态恢复执行
- 也可以在外部系统(如数据库、缓存)持久化状态字段用于日志、追踪、审计等
便于“模型无感知”的后台逻辑更新
比如你想更新 name
字段,不用让 LLM 重新生成对话、也不用发提示词。直接:
graph.update_state(config, {"name": "LangGraph (library)"})
这个更新动作对模型是透明的,但对整体系统却是确定性的。
支持人类中断/修正控制(Interrupt + Command)
你看到例子中 interrupt(...)
是靠状态字段把数据传给人类的:
Command(update=state_update)
这种人类回环(human-in-the-loop)能力,非常依赖结构化状态,否则很难以结构化方式交互。
在 LangGraph 中,每个 Node 就是一个函数:接收 State,返回新的 State。
def my_node(state: YourStateType) -> YourStateType:# ...处理逻辑...return new_state
也可以写成 async 函数(如果你要查数据库、调 API)
async def my_node(state: YourStateType) -> YourStateType:...return new_state
Node 一般要做什么?
- 从
state
中拿信息 - 做业务逻辑(如 LLM 调用、解析、API 查询)
- 把结果写回新的
state
因此,state里面的信息就是我们要在workflow中修改的内容!
最常见的几种 Node 类型:
1️⃣ 提取信息的 Node(抽取设备名、用户名等)
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableLambdadef collect_info_node(state: SupportState) -> SupportState:# 用 LLM 提取用户输入中的信息latest_user_msg = state["messages"][-1].contentprompt = f"请从这句话中提取用户名和设备名称:'{latest_user_msg}'"resp = call_llm(prompt)# 假设 resp = {"username": "张三", "device": "空调"}return {**state,"username": resp["username"],"device": resp["device"],"messages": state["messages"] + [AIMessage(content=prompt), AIMessage(content=str(resp))]}
2️⃣ 判断逻辑的 Node(是否需要人工处理)
def decide_node(state: SupportState) -> SupportState:human_needed = not state["warranty_valid"]return {**state,"human_needed": human_needed}
3️⃣ 接 LLM 的 Node(调用 ChatModel)
你可以直接用 langchain 的 Runnable:
from langchain_core.runnables import RunnableLambda, RunnableMap
from langchain_core.messages import HumanMessage
from langchain.chat_models import ChatOpenAIllm = ChatOpenAI(model="gpt-4")llm_node = RunnableLambda(lambda state: {**state,"messages": state["messages"] + [llm.invoke(state["messages"])]}
)
也可以封装一下:
def ask_llm_node(state: SupportState) -> SupportState:response = llm.invoke(state["messages"])return {**state,"messages": state["messages"] + [response]}
4️⃣ 人工中断的 Node(中止等待人工)
LangGraph 内置了 ToolNode
, HumanInput
, breakpoint
等机制:
from langgraph.prebuilt import ToolNodehuman_node = ToolNode(name="human", input_schema=SupportState)
或者你自定义:
def wait_human_node(state: SupportState) -> NoReturn:raise BreakpointException() # 停在这里,等人类继续
记住:Node 是纯函数 + 状态变更器 每个节点你就当它是个“处理器”:
输入:老状态 state
处理:干点事
输出:新状态 state'
只要你把所有 Node 函数写清楚,整个状态机就像装配流水线一样工作。
Time travel
在典型的聊天机器人工作流中,用户会与机器人进行一次或多次交互以完成某项任务。内存(Memory)和人工干预(Human-in-the-loop)机制允许我们在图状态中设置检查点,并控制未来的响应。
如果你希望用户能从之前的某个响应出发去探索不同的结果呢?
又或者你希望用户能够“回退”聊天机器人的执行过程,以修复错误或尝试不同策略呢?你可以使用 LangGraph 内置的“时间旅行”功能 来创建这些类型的交互体验。
你可以通过图的 get_state_history
方法获取状态历史中的检查点,然后从该时间点恢复执行。
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_conditionfrom langchain_tavily import TavilySearch
from langchain_core.messages import BaseMessage
from typing import Annotated
from typing_extensions import TypedDictclass State(TypedDict):messages: Annotated[list, add_messages]graph_builder = StateGraph(State)# 初始化 LLM + 工具
tool = TavilySearch(max_results=2)
tools = [tool]
llm_with_tools = llm.bind_tools(tools)def chatbot(state: State):return {"messages": [llm_with_tools.invoke(state["messages"])]}graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)# 添加边与分支条件
graph_builder.add_conditional_edges("chatbot", tools_condition)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")# 使用内存型检查点机制
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
每次图执行的步骤都会被记录在状态历史中。
config = {"configurable": {"thread_id": "1"}}
events = graph.stream({"messages": [{"role": "user","content": ("我正在学习 LangGraph,""你能帮我搜索一些相关信息吗?"),},],},config,stream_mode="values",
)for event in events:if "messages" in event:event["messages"][-1].pretty_print()
输出类似如下(简略):
Human: 我正在学习 LangGraph,能帮我研究下吗?
AI: 我将使用 Tavily 搜索引擎帮你查找最新信息……
Tool 调用: tavily_search_results_json
Tool 返回内容: [...]
AI 总结: LangGraph 是 LangChain 生态中的一部分,最近更新包括……
继续添加更多对话:
events = graph.stream({"messages": [{"role": "user","content": ("这很有帮助,也许我可以用它构建一个自动代理(autonomous agent)!"),},],},config,stream_mode="values",
)
现在你已经为聊天机器人添加了步骤,你可以回放完整的状态历史,以查看整个执行过程中的所有事件。
to_replay = None
for state in graph.get_state_history(config):print("消息数量: ", len(state.values["messages"]), "下一步: ", state.next)print("-" * 80)if len(state.values["messages"]) == 6:to_replay = state
输出(部分):
Num Messages: 8 Next: ()
--------------------------------------------------------------------------------
Num Messages: 7 Next: ('chatbot',)
--------------------------------------------------------------------------------
Num Messages: 6 Next: ('tools',)
--------------------------------------------------------------------------------
Num Messages: 5 Next: ('chatbot',)
--------------------------------------------------------------------------------
Num Messages: 4 Next: ('__start__',)
--------------------------------------------------------------------------------
Num Messages: 4 Next: ()
--------------------------------------------------------------------------------
Num Messages: 3 Next: ('chatbot',)
--------------------------------------------------------------------------------
Num Messages: 2 Next: ('tools',)
--------------------------------------------------------------------------------
Num Messages: 1 Next: ('chatbot',)
--------------------------------------------------------------------------------
Num Messages: 0 Next: ('__start__',)
--------------------------------------------------------------------------------
...
这些检查点会在每一步都自动保存,可以跨整个对话线程回滚。
从某个时间点恢复(Resume from a checkpoint)
从 to_replay
状态恢复,该状态位于第二次图调用中 chatbot 节点之后。从此处恢复将会接着调用 action 节点。
print(to_replay.next)
print(to_replay.config)
输出示例:
('tools',)
{'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efd43e3-0c1f-6c4e-8006-891877d65740'}}
你可以使用这个 checkpoint_id
来恢复状态并继续执行:to_replay.config 中包含一个 checkpoint_id 时间戳。提供该 checkpoint_id 值后,LangGraph 的检查点管理器(checkpointer)会从该时间点加载对应的状态。
for event in graph.stream(None, to_replay.config, stream_mode="values"):if "messages" in event:event["messages"][-1].pretty_print()
输出:
AI: 构建一个自动代理确实是个好主意。让我来帮你查找一些相关的教程与示例……
Tool 调用:搜索 “用 LangGraph 构建 autonomous agent”
Tool 返回:两个链接
AI 总结:LangGraph 适合构建多步骤、多工具的自动智能体……
理解"时间旅行"
LangGraph 的“时间旅行”(time-travel checkpointing)确实是一个比较抽象、偏底层的概念。它的核心目的是:允许开发者在多步骤流程中“回到某一时刻的状态”,并从那里重新执行或探索不同的路径。
在使用 LangGraph 构建多步骤 AI 工作流(比如一个多轮问答机器人、一个自动化代理系统)时:
- 每一步都会记录状态(LangGraph 自带 checkpoint 机制);
- 你可以“回放”这些状态,跳回某个历史节点,比如跳过聊天机器人节点、从 tool 调用节点开始执行;
- 这就像你写代码调试时,在某个断点重新运行一段逻辑,不必整个重跑。
实际应用场景有哪些?这个机制对以下需求特别有用:
- 调试代理系统:你想测试同样的输入,在不同工具调用分支会怎么走。
- 交互式实验:你可以改变某个节点的逻辑,再从那里重新跑一次看看效果。
- 复盘问题:一个复杂的执行流程出错了?你可以 rewind 到那个时间点看看上下文。
- 标注训练数据:模型在某节点判断错了,你回退、修正,再继续执行。
以下是一个关键代码片段(来自官方教程),从某个 checkpoint ID 开始 replay:
# 从历史某个状态重新执行
for event in graph.stream(None, to_replay.config, stream_mode="values"):if "messages" in event:print(event["messages"][-1]["text"])
其中 to_replay.config
包含了时间点信息,比如:
{"checkpoint_id": "1716732978"
}
我们常说checkpoint,那么checkpoint到底是什么呢?
Checkpoint 是 LangGraph 用来记录「某个节点执行之后的完整状态」的一份快照(snapshot)。
每次你的 graph 执行到一个 node,它会把:
- 当前对话上下文(如 LLM 输入/输出、memory、工具调用参数);
- 当前的 graph 状态;
- 哪个节点刚刚执行完、下一个节点是谁;
- 当前版本的配置;
全部记录下来,保存成一个 checkpoint。
就像游戏存档:
- 你走到某个 boss 关打输了 → 回到 checkpoint 再来一次;
- LangGraph 也是:你走到某个节点,发现执行错了 → 回退到 checkpoint 再执行一次。
一个 checkpoint 长什么样?
它是一个结构体(dict),大概像这样:
{"checkpoint_id": "1716732978","messages": [{"role": "user", "content": "推荐一个旅游路线"},{"role": "ai", "content": "你想去哪里?"}],"node": "chatbot","config": {...}
}
checkpoint_id
: 时间戳或唯一标识符;messages
: 当前上下文;node
: 当前处于哪个节点;config
: 控制执行行为的配置(比如stream
,interrupt
,stop_at_node
,to_replay
, 等等)。
综上,时间旅行(time-travel)本质上就是基于 checkpoint 实现的一种调试/开发工具。
预构建Agent
LangGraph 提供了构建 Agent 应用所需的底层原语和高级预构建组件。这些可复用的高级组件,它们能帮助你快速稳定地构建 Agent 系统,而无需从零实现调度、内存或人类反馈机制。
什么是 Agent?
一个 Agent 由三部分组成:
- 大语言模型(LLM)
- 可调用的工具集合(tools)
- 包含指令的提示词(prompt)
LLM 在一个循环中运行。在每轮中,模型:
- 选择一个工具进行调用
- 提供输入
- 获取工具返回的结果(即 observation)
- 根据这个结果决定下一步操作
这个循环持续进行,直到满足停止条件,通常是 Agent 已获得足够信息来回应用户请求。
核心特性
LangGraph 提供多项功能,帮助你构建强健、可部署的 Agent 系统:
- 内存集成:原生支持短期(会话级)与长期(跨会话)内存,便于创建有状态的聊天助手。
- 人类反馈控制(Human-in-the-loop):执行过程可无限期暂停,等待人工反馈——突破 WebSocket 实时交互的限制,支持异步审批、修改或干预。
- 流式支持:实时流式输出 Agent 状态、模型 token、工具调用结果,或多种组合输出。
- 部署工具:
- LangGraph 提供无需额外基础设施的本地部署工具;
- LangGraph Platform 支持测试、调试与线上部署。
- Studio 可视化 IDE:支持工作流结构的可视化查看与调试。
- 支持多种部署模式,适合生产环境。
高级构建模块:
LangGraph 提供一组预构建组件,用于实现常见的 Agent 行为与工作流。这些封装基于 LangGraph 框架构建,可加快上线速度,同时保留灵活定制的能力。使用 LangGraph 构建 Agent,意味着你可以专注于应用逻辑和行为,而不是从零管理状态、内存与人类反馈等基础设施。
生态组件:这些高层组件被组织为多个功能明确的包,每个包专注于特定领域。
包名 | 描述 | 安装命令 |
---|---|---|
langgraph-prebuilt | 构建 Agent 的预构建组件(LangGraph 自带) | pip install -U langgraph langchain |
langgraph-supervisor | 用于构建 Supervisor Agent(多 Agent 协调) | pip install -U langgraph-supervisor |
langgraph-swarm | 构建多智能体群体系统(Swarm) | pip install -U langgraph-swarm |
langchain-mcp-adapters | 连接 MCP 服务器的工具/资源适配器 | pip install -U langchain-mcp-adapters |
langmem | Agent 的短期/长期内存管理工具 | pip install -U langmem |
agentevals | Agent 性能评估工具集 | pip install -U agentevals |
可视化 Agent 图结构
你可以使用工具将由 create_react_agent()
创建的 Agent 工作流图形化展示,同时查看核心结构包括:
- tools:Agent 可调用的工具列表(函数、API 等)
- pre_model_hook:模型调用前的处理钩子(如压缩上下文)
- post_model_hook:模型调用后的处理钩子(如加防护网、接人类反馈等)
- response_format:约束最终输出的数据结构(如 pydantic BaseModel)
示例代码:创建一个 React Agent 并可视化
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from pydantic import BaseModelmodel = ChatOpenAI("o4-mini")def tool() -> None:"""Testing tool."""...def pre_model_hook() -> None:"""Pre-model hook."""...def post_model_hook() -> None:"""Post-model hook."""...class ResponseFormat(BaseModel):"""Response format for the agent."""result: stragent = create_react_agent(model,tools=[tool],pre_model_hook=pre_model_hook,post_model_hook=post_model_hook,response_format=ResponseFormat,
)agent.get_graph().draw_mermaid_png()
create_react_agent
是 LangGraph 提供的一个 高阶封装函数,它可以基于一套默认的 Agent 行为和结构,快速生成一张 LangGraph 图(Graph)用于执行。你可以把它理解为一个 “官方内建模板”,对用户手动构建的图(Graph)做了简化与封装。
特点:
- 自动生成节点和边,你只需要提供 LLM 模型和工具(tools)即可;
- 默认内置了 “ReAct” 模式(Reasoning + Acting),即 LLM 观察环境、选择工具、使用结果、再推理下一步;
- 自动处理了很多 状态管理、流控制、停止条件、异常处理;
- 可直接可视化(支持
agent.get_graph().draw_mermaid_png()
); - 易上手、适合快速原型和大多数标准 Agent 应用场景。
使用示例:
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAIagent = create_react_agent(model=ChatOpenAI("gpt-4o"),tools=[search_tool, calculator_tool],
)
graph = agent.get_graph()
手动构建图(自定义 Graph)
特点:
- 你需要显式定义 所有节点、边、状态转移逻辑;
- 灵活控制任何行为:如嵌套 agent、并发执行、条件跳转、人类中断点等;
- 可构建非 ReAct 模式,比如工具优先执行、分布式 agent、集群 swarm 等;
- 适合构建复杂、非标准、强定制化的 AI 系统。
使用示例(简略):
from langgraph.graph import StateGraph, ENDgraph = StateGraph(StateType)
graph.add_node("llm", llm_node)
graph.add_node("tool", tool_node)
graph.add_edge("llm", "tool")
graph.set_entry_point("llm")
graph.set_finish_point("tool")
⚖️ 总结对比
特性 | create_react_agent | 手动构建图 |
---|---|---|
是否快速上手 | ✅ 是 | ❌ 否 |
是否支持高度自定义 | ❌ 限制较多 | ✅ 非常灵活 |
适合 ReAct 结构 | ✅ 最佳场景 | ✅ 也可以做 |
状态管理 / 工具调用 | 自动封装 | 需自行管理 |
适合构建简单或中等复杂度 Agent | ✅ | ✅ |
适合构建复杂工作流 / 多阶段决策系统 | ❌ 不够灵活 | ✅ 最佳选择 |
如果你是第一次构建 Agent 或想快速验证一个工具集成场景,建议用 create_react_agent
起步。
如果你要构建:
- 一个具有人类中断审批机制的自动客服系统,
- 一个多轮递归工具调用的智能分析系统,
- 一个并发搜索与聚合的多智能体系统,
那就建议你手动定义图结构,使用底层 API 构建 Graph。
需要的话,我可以演示一个 create_react_agent
和一个自定义 Graph 的对比代码。
运行 Agent
代理(Agents)支持同步和异步两种执行方式,可分别使用 .invoke()
/ await .ainvoke()
获取完整响应,或使用 .stream()
/ .astream()
实现增量流式输出。
代理可以通过两种主要模式执行:
- 同步:使用
.invoke()
或.stream()
- 异步:使用
await .ainvoke()
或async for
配合.astream()
同步调用
from langgraph.prebuilt import create_react_agentagent = create_react_agent(...)response = agent.invoke({"messages": [{"role": "user", "content": "what is the weather in sf"}]})
异步调用
from langgraph.prebuilt import create_react_agentagent = create_react_agent(...)
response = await agent.ainvoke({"messages": [{"role": "user", "content": "what is the weather in sf"}]
})
代理使用语言模型,其期望的输入是一个消息列表,因此代理的输入输出都以 messages
这个键存储于代理状态中。
代理输入必须是一个包含 messages
键的字典。支持的格式包括:
格式 | 示例 |
---|---|
字符串 | {"messages": "Hello"} —— 被解释为一个 HumanMessage |
消息字典 | {"messages": {"role": "user", "content": "Hello"}} |
消息列表 | {"messages": [{"role": "user", "content": "Hello"}]} |
带自定义状态 | {"messages": [{"role": "user", "content": "Hello"}], "user_name": "Alice"} —— 如果使用了自定义的 state_schema |
消息会被自动转换为 LangChain 内部的消息格式。
你可以在输入字典中直接提供由代理的 state_schema
定义的附加字段。这样可以根据运行时数据或之前工具的输出动态控制行为。
这句话的意思是:
如果你定义了一个包含额外字段的 state_schema
(也就是代理的状态结构),那么你在调用代理时,就可以在输入的字典里添加这些字段,而不仅仅是 messages
。这样做的好处是:
- 可以根据当前运行时的上下文数据(比如用户身份、调用历史、外部工具返回值等),动态地影响代理的行为。
- 这些附加字段会随着代理执行流动和更新,作为“上下文状态”参与决策或提示模板的生成。
假设你定义的代理有以下状态结构(state_schema
):
{"messages": List[Message],"user_name": str,"location": str
}
那么你可以这样调用代理:
agent.invoke({"messages": [{"role": "user", "content": "What's the weather like?"}],"user_name": "Alice","location": "San Francisco"
})
然后代理就可以在内部使用这些状态字段,比如在提示中自动加入:
“Hi Alice, you’re asking about the weather in San Francisco…”
注意
字符串类型的messages
输入会被转换为HumanMessage
。这个行为和create_react_agent
中的prompt
参数不同,后者若传入字符串会被解释为SystemMessage
。
代理输出是一个包含以下内容的字典:
messages
: 执行期间交换的所有消息(用户输入、助手回复、工具调用等)- (可选)
structured_response
:如果配置了结构化输出 - 如果使用了自定义的
state_schema
,输出中还可能包含对应字段的键,这些键保存了工具执行或提示逻辑更新的状态值。
代理支持流式响应,用于更具响应性的应用场景,包括:
- 每一步后的进度更新
- LLM 生成 token 的过程
- 执行期间自定义工具消息
流式输出可用于同步和异步模式:
同步流式
for chunk in agent.stream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="updates"
):print(chunk)
异步流式
async for chunk in agent.astream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="updates"
):print(chunk)
为控制代理执行,避免无限循环,可以设置递归限制(recursion limit)。该值定义了代理最多能执行的步骤数,超出将抛出 GraphRecursionError
。
你可以在运行时或通过 .with_config()
在定义代理时设置该限制:
运行时:
from langgraph.errors import GraphRecursionError
from langgraph.prebuilt import create_react_agentmax_iterations = 3
recursion_limit = 2 * max_iterations + 1
agent = create_react_agent(model="anthropic:claude-3-5-haiku-latest",tools=[get_weather]
)try:response = agent.invoke({"messages": [{"role": "user", "content": "what's the weather in sf"}]},{"recursion_limit": recursion_limit},)
except GraphRecursionError:print("Agent stopped due to max iterations.")
with_config:
from langgraph.errors import GraphRecursionError
from langgraph.prebuilt import create_react_agentmax_iterations = 3
recursion_limit = 2 * max_iterations + 1
agent = create_react_agent(model="anthropic:claude-3-5-haiku-latest",tools=[get_weather]
)
agent_with_recursion_limit = agent.with_config(recursion_limit=recursion_limit)try:response = agent_with_recursion_limit.invoke({"messages": [{"role": "user", "content": "what's the weather in sf"}]},)
except GraphRecursionError:print("Agent stopped due to max iterations.")
Streaming
流式输出是构建高响应应用的关键。你可能希望流式传输以下几种类型的数据:
- 代理进度:每个代理图中的节点执行后获取更新。
- LLM token:语言模型生成的 token 逐个流式输出。
- 自定义更新:工具执行期间发送自定义数据(例如:“Fetched 10/100 records”)。
你可以同时流式传输多种类型的数据。
要流式获取代理执行的进度信息,可使用 stream()
或 astream()
方法,并设置 stream_mode="updates"
。这将在每一步代理操作后发出事件。
例如,如果代理只调用一个工具,你会依次看到以下更新:
- LLM 节点:AI 消息(包含工具调用请求)
- 工具节点:工具消息(包含执行结果)
- LLM 节点:最终 AI 回复
同步示例
agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)
for chunk in agent.stream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="updates"
):print(chunk)print("\n")
异步示例
agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)
async for chunk in agent.astream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="updates"
):print(chunk)print("\n")
要在 token 被语言模型生成时逐个流式获取,可以使用 stream_mode="messages"
。
同步示例
agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)
for token, metadata in agent.stream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="messages"
):print("Token", token)print("Metadata", metadata)print("\n")
异步示例
agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)
async for token, metadata in agent.astream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="messages"
):print("Token", token)print("Metadata", metadata)print("\n")
工具更新
要在工具执行过程中获取其发送的流式更新,可以使用 get_stream_writer
。
如果你想让工具在运行中实时输出一些进度信息(比如“正在拉取数据”、“已处理 50 条”等),可以在工具里调用 get_stream_writer()
获得一个“写入器”,这个写入器支持将消息发射出去,LangGraph 会监听并展示它。
get_stream_writer()
是 LangGraph 提供的“上下文绑定”能力,它只能在 LangGraph 的 stream 执行中调用。
如果你单独调用这个工具函数(例如在普通 Python 程序中直接 get_weather("sf")
),由于没有 LangGraph 的上下文支持,它会报错或者无法正常运行。
举个例子:
def get_weather(city: str) -> str:writer = get_stream_writer() # <- 只能在 LangGraph 中用writer(f"查找城市: {city}") # 实时发射进度信息return f"{city} 天气晴朗"
这个函数可以在 LangGraph 中以 stream 方式运行,并输出中间信息;但如果你这样单独运行:
print(get_weather("Beijing"))
它会因为找不到上下文而失败或异常。
内容 | 解释 |
---|---|
get_stream_writer() | LangGraph 提供的工具,用于工具函数内发射自定义流数据 |
作用 | 可以实现像:“正在处理第 x 条记录”的进度推送 |
限制 | 只能在 LangGraph 的 stream() 或 astream() 之中使用,否则无法运行 |
你可以把 get_stream_writer()
理解为一种“只在 LangGraph 环境中才能用的管道”,普通运行时它是“失效”的。
同步示例:
from langgraph.config import get_stream_writerdef get_weather(city: str) -> str:"""Get weather for a given city."""writer = get_stream_writer()# stream any arbitrary datawriter(f"Looking up data for city: {city}")return f"It's always sunny in {city}!"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)for chunk in agent.stream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="custom"
):print(chunk)print("\n")
异步示例
from langgraph.config import get_stream_writerdef get_weather(city: str) -> str:"""获取指定城市的天气信息。"""writer = get_stream_writer()# 发送任意自定义流数据writer(f"Looking up data for city: {city}")return f"It's always sunny in {city}!"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)for chunk in agent.stream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode="custom"
):print(chunk)print("\n")
注意
如果你在工具函数中使用了get_stream_writer
,则无法在 LangGraph 执行上下文外部调用该工具。
同时启用多种流式模式:
你可以通过将 stream_mode
设为列表来同时启用多种模式,例如 stream_mode=["updates", "messages", "custom"]
:
同步示例
agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)for stream_mode, chunk in agent.stream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode=["updates", "messages", "custom"]
):print(chunk)print("\n")
异步示例
agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],
)async for stream_mode, chunk in agent.astream({"messages": [{"role": "user", "content": "what is the weather in sf"}]},stream_mode=["updates", "messages", "custom"]
):print(chunk)print("\n")
在某些应用中,你可能需要关闭某个模型的 token 流式输出功能。这在多代理系统中尤其有用,可以控制哪些代理启用流式,哪些禁用。
详细内容见 模型指南(Models guide)。
Models
工具调用支持(Tool calling support)
要启用支持工具调用的代理,底层的大语言模型(LLM)必须具备工具调用能力。
兼容的模型可以在 LangChain 的集成目录中找到。
通过名称指定模型(Specifying a model by name)
你可以使用模型名称字符串为代理配置模型:
- OpenAI
- Anthropic
- Azure
- Google Gemini
- AWS Bedrock
import os
from langgraph.prebuilt import create_react_agentos.environ["OPENAI_API_KEY"] = "sk-..."agent = create_react_agent(model="openai:gpt-4.1",# 其他参数
)
使用 init_chat_model
(Using init_chat_model)
init_chat_model
工具可以通过可配置参数简化模型初始化流程:
支持以下提供商:
- OpenAI
- Anthropic
- Azure
- Google Gemini
- AWS Bedrock
pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_modelos.environ["OPENAI_API_KEY"] = "sk-..."model = init_chat_model("openai:gpt-4.1",temperature=0,# 其他参数
)
高级参数请参考 API 文档。
使用特定厂商模型(Using provider-specific LLMs)
如果某个模型提供商未通过 init_chat_model
提供,你可以直接实例化该厂商的模型类。
模型必须实现 BaseChatModel
接口,并支持工具调用:
API 文档:ChatAnthropic
| create_react_agent
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agentmodel = ChatAnthropic(model="claude-3-7-sonnet-latest",temperature=0,max_tokens=2048
)agent = create_react_agent(model=model,# 其他参数
)
📌 说明示例
上述示例使用了 ChatAnthropic
,虽然该模型也已被 init_chat_model
支持,但此处示范的目的是说明如何手动实例化那些尚未被 init_chat_model
支持的模型。
禁用流式输出(Disable streaming)
若你希望禁用模型生成时的逐 token 流式输出,
可在模型初始化时设置 disable_streaming=True
:
from langchain.chat_models import init_chat_modelmodel = init_chat_model("anthropic:claude-3-7-sonnet-latest",disable_streaming=True
)
或from langchain_anthropic import ChatAnthropicmodel = ChatAnthropic(model="claude-3-7-sonnet-latest",disable_streaming=True
)
添加模型回退机制(Adding model fallbacks)
你可以使用 model.with_fallbacks([...])
为模型添加回退项,以便在当前模型不可用时自动切换到备用模型或不同提供商的模型:
from langchain.chat_models import init_chat_modelmodel_with_fallbacks = (init_chat_model("anthropic:claude-3-5-haiku-latest").with_fallbacks([init_chat_model("openai:gpt-4.1-mini"),])
)
工具(Tools)
工具是一种将函数及其输入模式封装的方式,能够传递给支持工具调用的聊天模型。
这允许模型使用指定输入请求执行该函数。
你可以自定义工具,也可以使用 LangChain 提供的预构建集成工具。
定义简单工具(Define simple tools)
你可以将一个普通函数传递给 create_react_agent
来作为工具使用:
from langgraph.prebuilt import create_react_agentdef multiply(a: int, b: int) -> int:"""两个数相乘。"""return a * bcreate_react_agent(model="anthropic:claude-3-7-sonnet",tools=[multiply]
)
create_react_agent
会自动将普通函数转换为 LangChain 工具。
自定义工具行为(Customize tools)
若想更细致地控制工具行为,可以使用 @tool
装饰器:
from langchain_core.tools import tool@tool("multiply_tool", parse_docstring=True)
def multiply(a: int, b: int) -> int:"""两个数相乘。参数:a: 第一个操作数b: 第二个操作数"""return a * b
你也可以通过 Pydantic 定义自定义输入模式:
from pydantic import BaseModel, Fieldclass MultiplyInputSchema(BaseModel):"""两个数相乘"""a: int = Field(description="第一个操作数")b: int = Field(description="第二个操作数")@tool("multiply_tool", args_schema=MultiplyInputSchema)
def multiply(a: int, b: int) -> int:return a * b
隐藏模型不可见参数(Hide arguments from the model)
有些工具需要运行时参数(如用户 ID 或会话上下文),这些参数不应由模型控制。
你可以将这些参数放入代理的 state
或 config
中,并在工具内访问:
from langgraph.prebuilt import InjectedState
from langgraph.prebuilt.chat_agent_executor import AgentState
from langchain_core.runnables import RunnableConfigdef my_tool(tool_arg: str, # 由 LLM 填充state: Annotated[AgentState, InjectedState], # 动态上下文config: RunnableConfig, # 静态配置信息
) -> str:"""我的工具"""do_something_with_state(state["messages"])do_something_with_config(config)...
禁用并行工具调用(Disable parallel tool calling)
部分模型提供商支持并行调用多个工具,你可以通过如下方式禁用:
from langchain.chat_models import init_chat_modeldef add(a: int, b: int) -> int:return a + bdef multiply(a: int, b: int) -> int:return a * bmodel = init_chat_model("anthropic:claude-3-5-sonnet-latest", temperature=0)
tools = [add, multiply]agent = create_react_agent(model=model.bind_tools(tools, parallel_tool_calls=False),tools=tools
)agent.invoke({"messages": [{"role": "user", "content": "what's 3 + 5 and 4 * 7?"}]})
直接返回工具结果(Return tool results directly)
使用 return_direct=True
,可在工具执行完立即返回结果并终止代理循环:
from langchain_core.tools import tool@tool(return_direct=True)
def add(a: int, b: int) -> int:return a + bagent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[add]
)agent.invoke({"messages": [{"role": "user", "content": "what's 3 + 5?"}]})
强制使用某个工具(Force tool use)
你可以使用 tool_choice
强制代理使用某个指定工具:
from langchain_core.tools import tool@tool(return_direct=True)
def greet(user_name: str) -> str:return f"Hello {user_name}!"tools = [greet]agent = create_react_agent(model=model.bind_tools(tools, tool_choice={"type": "tool", "name": "greet"}),tools=tools
)agent.invoke({"messages": [{"role": "user", "content": "Hi, I am Bob"}]})
⚠️ 警告:强制使用工具但未设置终止条件,可能导致无限循环。请使用以下方式防护:
- 使用
return_direct=True
标记工具,使其在执行后立即结束; - 使用
recursion_limit
限制最大执行步数。
处理工具错误(Handle tool errors)
默认情况下,代理会捕捉工具调用中抛出的所有异常,并作为工具消息返回给 LLM。
要自定义错误处理,可通过 create_react_agent
内部的 ToolNode
配置 handle_tool_errors
参数。
默认:
from langgraph.prebuilt import create_react_agentdef multiply(a: int, b: int) -> int:if a == 42:raise ValueError("The ultimate error")return a * bagent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[multiply]
)agent.invoke({"messages": [{"role": "user", "content": "what's 42 x 7?"}]})
禁止处理异常:
from langgraph.prebuilt import create_react_agent, ToolNodedef multiply(a: int, b: int) -> int:"""Multiply two numbers."""if a == 42:raise ValueError("The ultimate error")return a * btool_node = ToolNode([multiply],handle_tool_errors=False
)
agent_no_error_handling = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=tool_node
)
agent_no_error_handling.invoke({"messages": [{"role": "user", "content": "what's 42 x 7?"}]}
)
自定义异常:
from langgraph.prebuilt import create_react_agent, ToolNodedef multiply(a: int, b: int) -> int:"""Multiply two numbers."""if a == 42:raise ValueError("The ultimate error")return a * btool_node = ToolNode([multiply],handle_tool_errors=("Can't use 42 as a first operand, you must switch operands!" )
)
agent_custom_error_handling = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=tool_node
)
agent_custom_error_handling.invoke({"messages": [{"role": "user", "content": "what's 42 x 7?"}]}
)
使用记忆(Working with memory)
LangGraph 工具可以访问代理的短期记忆与长期记忆。
使用预构建工具(Prebuilt tools)
你可以通过 tools
参数传入工具规范(字典格式),使用模型厂商提供的内置工具。
例如,使用 OpenAI 的 web_search_preview
工具:
from langgraph.prebuilt import create_react_agentagent = create_react_agent(model="openai:gpt-4o-mini", tools=[{"type": "web_search_preview"}]
)response = agent.invoke({"messages": ["What was a positive news story from today?"]}
)
LangChain 还支持多种预构建工具集成,涵盖 API、数据库、文件系统、网页数据等。
常见工具分类包括:
- 搜索:Bing、SerpAPI、Tavily
- 代码执行器:Python REPL、Node.js REPL
- 数据库:SQL、MongoDB、Redis
- 网页数据:网页抓取与浏览
- 第三方 API:OpenWeatherMap、NewsAPI 等
这些工具均可通过上述 tools
参数方式添加到代理中。
以下是 MCP 集成 相关内容的翻译:
MCP 集成
Model Context Protocol (MCP) 是一个开放协议,用于标准化应用程序向语言模型提供工具与上下文的方式。LangGraph 的 agent 可以通过 langchain-mcp-adapters
库使用 MCP 服务器上定义的工具。
若要在 LangGraph 中使用 MCP 工具,请先安装相关库:
pip install langchain-mcp-adapters
langchain-mcp-adapters
包允许 agent 使用一个或多个 MCP 服务器上定义的工具。
示例:使用 MCP 服务器上的工具
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agentclient = MultiServerMCPClient({"math": {"command": "python",# 替换为你本地 math_server.py 文件的绝对路径"args": ["/path/to/math_server.py"],"transport": "stdio",},"weather": {# 确保你已在 8000 端口启动了天气服务"url": "http://localhost:8000/mcp","transport": "streamable_http",}}
)tools = await client.get_tools()agent = create_react_agent("anthropic:claude-3-7-sonnet-latest",tools
)# 调用数学工具
math_response = await agent.ainvoke({"messages": [{"role": "user", "content": "what's (3 + 5) x 12?"}]}
)# 调用天气工具
weather_response = await agent.ainvoke({"messages": [{"role": "user", "content": "what is the weather in nyc?"}]}
)
你也可以使用 mcp
库来创建自己的 MCP 工具服务器。该库提供了一种简单方式来定义工具并以服务器形式运行。
pip install mcp
以下是参考实现,可用于测试你的 agent 与 MCP 工具服务器的集成:
示例:数学工具服务器(使用 stdio 传输)
from mcp.server.fastmcp import FastMCPmcp = FastMCP("Math")@mcp.tool()
def add(a: int, b: int) -> int:"""加法运算"""return a + b@mcp.tool()
def multiply(a: int, b: int) -> int:"""乘法运算"""return a * bif __name__ == "__main__":mcp.run(transport="stdio")
示例:天气工具服务器(使用可流式 HTTP 传输)
from mcp.server.fastmcp import FastMCPmcp = FastMCP("Weather")@mcp.tool()
async def get_weather(location: str) -> str:"""获取指定位置天气"""return "纽约永远是晴天"if __name__ == "__main__":mcp.run(transport="streamable-http")
上下文(Context)
以下是该文档的翻译版本,采用段落式 Markdown 排版:
上下文(Context)
代理(Agents)往往不仅仅需要一组消息列表来有效运行,还需要上下文信息。
上下文是指消息列表之外的任何可以影响代理行为或工具执行的数据,例如:
- 运行时传入的信息,如
user_id
或 API 凭证 - 在多步推理过程中更新的内部状态
- 来自先前交互的持久记忆或事实
LangGraph 提供三种主要方式来提供上下文:
类型 | 描述 | 可变? | 生命周期 |
---|---|---|---|
Config | 运行开始时传入的数据 | ❌ | 每次运行 |
State | 在执行过程中可能变化的动态数据 | ✅ | 每次运行或对话期间 |
长期记忆(Store) | 可在对话之间共享的数据 | ✅ | 跨对话 |
你可以使用上下文来:
- 调整模型看到的系统提示词
- 向工具提供必要的输入
- 在持续的对话中跟踪事实
提供运行时上下文
在你需要在运行时向代理注入数据时使用此方式。
Config(静态上下文)
Config
用于不可变的数据,例如用户元数据或 API 密钥。适用于在运行过程中不会变化的值。
使用一个名为 configurable
的保留键来指定配置:
agent.invoke({"messages": [{"role": "user", "content": "hi!"}]},config={"configurable": {"user_id": "user_123"}}
)
State(可变上下文)
State
是运行期间的短期记忆。它包含可能在执行中演化的数据,例如工具返回值或 LLM 输出。
class CustomState(AgentState):user_name: stragent = create_react_agent(# 其他参数...state_schema=CustomState,
)agent.invoke({"messages": "hi!","user_name": "Jane"
})
启用记忆功能
详见 Memory 指南,以了解如何启用记忆。这是一个强大的功能,允许你在多个调用之间持久化代理状态。否则,state
的作用范围仅限于一次运行。
长期记忆(跨对话上下文)
对于跨会话或会话持续存在的上下文,LangGraph 提供了访问长期记忆(store)的能力。这可以用于读取或更新持久性事实(如用户资料、偏好、过往交互记录)。详见 Memory 指南。
使用上下文自定义提示词
提示词决定代理的行为。你可以根据代理的状态或配置动态生成提示词来引入运行时上下文。
常见用途:
- 个性化(Personalization)
- 角色或目标定制
- 条件行为(如:用户是否为管理员)
使用 Config
from langchain_core.messages import AnyMessage
from langchain_core.runnables import RunnableConfig
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentStatedef prompt(state: AgentState,config: RunnableConfig,
) -> list[AnyMessage]:user_name = config["configurable"].get("user_name")system_msg = f"You are a helpful assistant. User's name is {user_name}"return [{"role": "system", "content": system_msg}] + state["messages"]agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],prompt=prompt
)agent.invoke(...,config={"configurable": {"user_name": "John Smith"}}
)
适用State
from langchain_core.messages import AnyMessage
from langchain_core.runnables import RunnableConfig
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentStateclass CustomState(AgentState):user_name: strdef prompt(state: CustomState
) -> list[AnyMessage]:user_name = state["user_name"]system_msg = f"You are a helpful assistant. User's name is {user_name}"return [{"role": "system", "content": system_msg}] + state["messages"]agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[...],state_schema=CustomState,prompt=prompt
)agent.invoke({"messages": "hi!","user_name": "John Smith"
})
在工具中访问上下文
工具可以通过特殊参数注解访问上下文。
- 使用
RunnableConfig
访问config
- 使用
Annotated[StateSchema, InjectedState]
注入state
💡 提示: 这些注解会防止 LLM 试图填充这些值。这些参数对 LLM 是不可见的。
使用 Config
def get_user_info(config: RunnableConfig,
) -> str:"""Look up user info."""user_id = config["configurable"].get("user_id")return "User is John Smith" if user_id == "user_123" else "Unknown user"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_user_info],
)agent.invoke({"messages": [{"role": "user", "content": "look up user information"}]},config={"configurable": {"user_id": "user_123"}}
)
使用 State
from typing import Annotated
from langgraph.prebuilt import InjectedStateclass CustomState(AgentState):user_id: strdef get_user_info(state: Annotated[CustomState, InjectedState]
) -> str:"""Look up user info."""user_id = state["user_id"]return "User is John Smith" if user_id == "user_123" else "Unknown user"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_user_info],state_schema=CustomState,
)agent.invoke({"messages": "look up user information","user_id": "user_123"
})
工具可以在执行过程中更新代理上下文(包括 state
和长期记忆)。这对于保存中间结果或将信息传递给后续工具或提示非常有用。详见 Memory 指南。
记忆(Memory)
LangGraph 支持两种构建对话代理所必需的记忆类型:
- 短期记忆:在一个会话内记录对话历史。
- 长期记忆:在多个会话之间存储用户特定或应用级别的数据。
注意:短期和长期记忆都需要持久化存储,以在多个 LLM 调用之间保持连续性。在生产环境中,这些数据通常存储在数据库中。
在 LangGraph 中:
- 短期记忆也称为“线程级记忆(thread-level memory)”
- 长期记忆也称为“跨线程记忆(cross-thread memory)”
- 线程(thread) 表示由同一个
thread_id
分组的一系列相关运行
短期记忆
短期记忆让代理能够跟踪多轮对话。要使用它,你需要:
- 在创建代理时提供
checkpointer
,用于持久化代理状态。 - 在运行代理时提供
thread_id
,这是每个会话的唯一标识符。
示例:
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import InMemorySavercheckpointer = InMemorySaver()def get_weather(city: str) -> str:return f"It's always sunny in {city}!"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_weather],checkpointer=checkpointer
)config = {"configurable": {"thread_id": "1"}}sf_response = agent.invoke({"messages": [{"role": "user", "content": "what is the weather in sf"}]},config
)ny_response = agent.invoke({"messages": [{"role": "user", "content": "what about new york?"}]},config
)
第二次调用时使用相同 thread_id
,代理将自动包含历史消息,从而理解用户的上下文。
管理消息历史
长时间的对话可能会超出大语言模型(LLM)的上下文窗口限制。常见的解决方案包括:
- 摘要(Summarization):维护一份对话的运行中摘要。
- 修剪(Trimming):移除最早或最近的 N 条消息
这些方法可以帮助代理在不超过 LLM 上下文窗口限制的情况下,持续跟踪对话进程。
若要管理消息历史,可使用 pre_model_hook
参数 —— 这是一个在调用语言模型之前始终执行的函数(节点)。
若要对消息历史进行摘要,可以使用 pre_model_hook 配合预构建的 SummarizationNode 来实现。
使用摘要功能:
from langchain_anthropic import ChatAnthropic
from langmem.short_term import SummarizationNode
from langchain_core.messages.utils import count_tokens_approximately
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentState
from langgraph.checkpoint.memory import InMemorySaver
from typing import Anymodel = ChatAnthropic(model="claude-3-7-sonnet-latest")summarization_node = SummarizationNode( token_counter=count_tokens_approximately,model=model,max_tokens=384,max_summary_tokens=128,output_messages_key="llm_input_messages",
)class State(AgentState):# NOTE: we're adding this key to keep track of previous summary information# to make sure we're not summarizing on every LLM callcontext: dict[str, Any] checkpointer = InMemorySaver() agent = create_react_agent(model=model,tools=tools,pre_model_hook=summarization_node, state_schema=State, checkpointer=checkpointer,
)
使用修剪功能:
要裁剪消息历史,可以使用 pre_model_hook
搭配 trim_messages
函数来实现。
from langchain_core.messages.utils import (trim_messages,count_tokens_approximately
)
from langgraph.prebuilt import create_react_agent# This function will be called every time before the node that calls LLM
def pre_model_hook(state):trimmed_messages = trim_messages(state["messages"],strategy="last",token_counter=count_tokens_approximately,max_tokens=384,start_on="human",end_on=("human", "tool"),)return {"llm_input_messages": trimmed_messages}checkpointer = InMemorySaver()
agent = create_react_agent(model,tools,pre_model_hook=pre_model_hook,checkpointer=checkpointer,
)
在工具中读取状态
LangGraph 允许代理在工具中访问其短期记忆(state)。
from typing import Annotated
from langgraph.prebuilt import InjectedState, create_react_agentclass CustomState(AgentState):user_id: strdef get_user_info(state: Annotated[CustomState, InjectedState]
) -> str:"""Look up user info."""user_id = state["user_id"]return "User is John Smith" if user_id == "user_123" else "Unknown user"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_user_info],state_schema=CustomState,
)agent.invoke({"messages": "look up user information","user_id": "user_123"
})
在工具中写入状态
要在执行过程中修改代理的短期记忆(state),可以直接从工具中返回状态更新。这对于持久化中间结果或让后续工具或提示能够访问相关信息非常有用。
from typing import Annotated
from langchain_core.tools import InjectedToolCallId
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import ToolMessage
from langgraph.prebuilt import InjectedState, create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentState
from langgraph.types import Commandclass CustomState(AgentState):user_name: strdef update_user_info(tool_call_id: Annotated[str, InjectedToolCallId],config: RunnableConfig
) -> Command:"""Look up and update user info."""user_id = config["configurable"].get("user_id")name = "John Smith" if user_id == "user_123" else "Unknown user"return Command(update={"user_name": name,# update the message history"messages": [ToolMessage("Successfully looked up user information",tool_call_id=tool_call_id)]})def greet(state: Annotated[CustomState, InjectedState]
) -> str:"""Use this to greet the user once you found their info."""user_name = state["user_name"]return f"Hello {user_name}!"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[update_user_info, greet],state_schema=CustomState
)agent.invoke({"messages": [{"role": "user", "content": "greet the user"}]},config={"configurable": {"user_id": "user_123"}}
)
长期记忆
使用长期记忆可以在多轮对话中存储用户特定或应用特定的数据。这对于希望记住用户偏好或其他信息的应用(如聊天机器人)非常有用。
要使用长期记忆,你需要:
- 配置一个 store,用于在多次调用之间持久化数据。
- 使用
get_store
函数,在工具或提示中访问该存储。
读取数据(Read)
定义一个工具供代理查找用户信息:
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStorestore = InMemoryStore() store.put( ("users",), "user_123", {"name": "John Smith","language": "English",}
)def get_user_info(config: RunnableConfig) -> str:"""查找用户信息"""store = get_store() # 与 create_react_agent 中配置一致user_id = config["configurable"].get("user_id")user_info = store.get(("users",), user_id) return str(user_info.value) if user_info else "Unknown user"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_user_info],store=store
)# 执行代理调用
agent.invoke({"messages": [{"role": "user", "content": "look up user information"}]},config={"configurable": {"user_id": "user_123"}}
)
写入数据(Write)
定义一个工具,用于更新用户信息:
from typing_extensions import TypedDict
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStorestore = InMemoryStore() class UserInfo(TypedDict): name: strdef save_user_info(user_info: UserInfo, config: RunnableConfig) -> str: """保存用户信息"""store = get_store() user_id = config["configurable"].get("user_id")store.put(("users",), user_id, user_info) return "Successfully saved user info."agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[save_user_info],store=store
)# 执行代理调用
agent.invoke({"messages": [{"role": "user", "content": "My name is John Smith"}]},config={"configurable": {"user_id": "user_123"}}
)# 你可以直接访问 store 获取该值
store.get(("users",), "user_123").value
语义搜索(Semantic search)
LangGraph 还支持通过语义相似度,在长期记忆中进行搜索。
LangMem 是一个由 LangChain 维护的库,提供用于管理代理长期记忆的工具。可参考 LangMem 文档了解使用示例。
人类介入(Human-in-the-loop)
为了让代理在执行工具调用时支持人工审查、编辑和批准,LangGraph 提供了内建的 Human-In-the-Loop(HIL)特性,核心是 interrupt()
原语。
LangGraph 支持无限期地暂停代理执行 —— 可以是几分钟、几小时,甚至几天 —— 直到接收到人类输入。
这得益于代理状态被 checkpoint 到数据库中,从而支持上下文持久化,并在稍后恢复流程时从中断处继续执行。
工具调用审查(Review tool calls)
要给某个工具添加人工审批步骤:
- 使用
interrupt()
在工具中暂停执行。 - 接收到人工输入后,使用
Command(resume=...)
继续执行。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt
from langgraph.prebuilt import create_react_agent# An example of a sensitive tool that requires human review / approval
def book_hotel(hotel_name: str):"""Book a hotel"""response = interrupt( f"Trying to call `book_hotel` with args {{'hotel_name': {hotel_name}}}. ""Please approve or suggest edits.")if response["type"] == "accept":passelif response["type"] == "edit":hotel_name = response["args"]["hotel_name"]else:raise ValueError(f"Unknown response type: {response['type']}")return f"Successfully booked a stay at {hotel_name}."checkpointer = InMemorySaver() agent = create_react_agent(model="anthropic:claude-3-5-sonnet-latest",tools=[book_hotel],checkpointer=checkpointer,
)
使用 stream()
方法运行代理,传入 config
对象以指定线程 ID。这使得代理能够在后续调用时恢复同一个对话。
config = {"configurable": {"thread_id": "1"}
}for chunk in agent.stream({"messages": [{"role": "user", "content": "book a stay at McKittrick hotel"}]},config
):print(chunk)print("\n")
你会看到代理运行直到遇到 interrupt()
调用,此时它会暂停并等待人工输入。
通过 Command(resume=...)
恢复代理,以根据人工输入继续执行。
from langgraph.types import Commandfor chunk in agent.stream(Command(resume={"type": "accept"}), # Command(resume={"type": "edit", "args": {"hotel_name": "McKittrick Hotel"}}),config
):print(chunk)print("\n")
与 Agent Inbox 一起使用
你可以创建一个包装器,为任何工具添加中断功能。
下面的示例提供了一个参考实现,兼容Agent Inbox UI 和 Agent 聊天界面。
from typing import Callable
from langchain_core.tools import BaseTool, tool as create_tool
from langchain_core.runnables import RunnableConfig
from langgraph.types import interrupt
from langgraph.prebuilt.interrupt import HumanInterruptConfig, HumanInterruptdef add_human_in_the_loop(tool: Callable | BaseTool,*,interrupt_config: HumanInterruptConfig = None,
) -> BaseTool:"""Wrap a tool to support human-in-the-loop review.""" if not isinstance(tool, BaseTool):tool = create_tool(tool)if interrupt_config is None:interrupt_config = {"allow_accept": True,"allow_edit": True,"allow_respond": True,}@create_tool( tool.name,description=tool.description,args_schema=tool.args_schema)def call_tool_with_interrupt(config: RunnableConfig, **tool_input):request: HumanInterrupt = {"action_request": {"action": tool.name,"args": tool_input},"config": interrupt_config,"description": "Please review the tool call"}response = interrupt([request])[0] # approve the tool callif response["type"] == "accept":tool_response = tool.invoke(tool_input, config)# update tool call argselif response["type"] == "edit":tool_input = response["args"]["args"]tool_response = tool.invoke(tool_input, config)# respond to the LLM with user feedbackelif response["type"] == "response":user_feedback = response["args"]tool_response = user_feedbackelse:raise ValueError(f"Unsupported interrupt response type: {response['type']}")return tool_responsereturn call_tool_with_interrupt
你可以使用 add_human_in_the_loop 包装器,将 interrupt() 添加到任何工具中,而无需在工具内部添加它:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agentcheckpointer = InMemorySaver()def book_hotel(hotel_name: str):"""Book a hotel"""return f"Successfully booked a stay at {hotel_name}."agent = create_react_agent(model="anthropic:claude-3-5-sonnet-latest",tools=[add_human_in_the_loop(book_hotel), ],checkpointer=checkpointer,
)config = {"configurable": {"thread_id": "1"}}# Run the agent
for chunk in agent.stream({"messages": [{"role": "user", "content": "book a stay at McKittrick hotel"}]},config
):print(chunk)print("\n")
你会看到代理运行到调用 interrupt() 的位置时暂停,等待人工输入。
然后使用 Command(resume=…) 恢复代理,根据人工输入继续执行。
from langgraph.types import Command for chunk in agent.stream(Command(resume=[{"type": "accept"}]),# Command(resume=[{"type": "edit", "args": {"args": {"hotel_name": "McKittrick Hotel"}}}]),config
):print(chunk)print("\n")
“Using with Agent Inbox” 这部分讲什么?
这部分内容是在说,你可以用一个“包装器”(wrapper)来给任意工具(tool)添加“人工干预”功能,也就是加上中断(interrupt),让执行过程中可以暂停等待人工确认或修改。
为什么要这么做?
- 有时候工具调用比较敏感,需要人工审核才能继续执行,比如预订酒店、转账等操作。
- 不想在每个工具代码里都写中断逻辑,这样很麻烦。
- 用包装器可以统一给所有工具加上人工审核能力,代码更整洁,也方便和界面(Agent Inbox UI、Agent Chat UI)配合使用。
它具体做了什么?
-
定义了一个叫
add_human_in_the_loop
的函数,这个函数:- 接收一个工具(函数或类)作为参数。
- 返回一个新的工具,这个新工具在调用原工具前,会先触发一个
interrupt()
中断,等待人工确认或修改。 - 根据人工反馈,决定是继续调用原工具,还是修改参数后再调用,或者直接给出用户反馈。
-
这个包装器可以和 Agent Inbox UI 和 Agent Chat UI 配合使用,方便人工在界面上审查和操作工具调用。
用法举例
- 你有个预订酒店的工具
book_hotel
。 - 用
add_human_in_the_loop(book_hotel)
包装后,调用这个包装后的工具时,会先暂停等待人工确认。 - 人工确认后,工具继续运行,完成预订。
总结一下:
- “Using with Agent Inbox” 是说你可以通过这个包装器,轻松给任意工具加上人工审核功能。
- 这样工具调用就能暂停,等待人确认,再继续。
- 对敏感操作非常有用。
多智能体
如果单个智能体需要在多个领域专业化或管理许多工具,可能会遇到困难。为了解决这个问题,你可以将智能体拆分成更小的、独立的智能体,并将它们组合成一个多智能体系统。
在多智能体系统中,智能体之间需要相互通信。它们通过“移交”(handoffs)来实现——这是一种原语,用来描述将控制权交给哪个智能体以及发送给该智能体的负载内容。
两种最流行的多智能体架构是:
- 主管架构(supervisor):由一个中央主管智能体协调各个独立智能体。主管控制所有的通信流程和任务分配,根据当前的上下文和任务需求决定调用哪个智能体。
- 群体架构(swarm):智能体根据各自的专长动态地将控制权相互移交。系统会记住最后活跃的智能体,确保后续的交互可以从该智能体继续对话。
Supervisor
Use langgraph-supervisor library to create a supervisor multi-agent system:
pip install langgraph-supervisor
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph_supervisor import create_supervisordef book_hotel(hotel_name: str):"""Book a hotel"""return f"Successfully booked a stay at {hotel_name}."def book_flight(from_airport: str, to_airport: str):"""Book a flight"""return f"Successfully booked a flight from {from_airport} to {to_airport}."flight_assistant = create_react_agent(model="openai:gpt-4o",tools=[book_flight],prompt="You are a flight booking assistant",name="flight_assistant"
)hotel_assistant = create_react_agent(model="openai:gpt-4o",tools=[book_hotel],prompt="You are a hotel booking assistant",name="hotel_assistant"
)supervisor = create_supervisor(agents=[flight_assistant, hotel_assistant],model=ChatOpenAI(model="gpt-4o"),prompt=("You manage a hotel booking assistant and a""flight booking assistant. Assign work to them.")
).compile()for chunk in supervisor.stream({"messages": [{"role": "user","content": "book a flight from BOS to JFK and a stay at McKittrick Hotel"}]}
):print(chunk)print("\n")
Swarm
Use langgraph-swarm library to create a swarm multi-agent system:
pip install langgraph-swarm
from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_swarm, create_handoff_tooltransfer_to_hotel_assistant = create_handoff_tool(agent_name="hotel_assistant",description="Transfer user to the hotel-booking assistant.",
)
transfer_to_flight_assistant = create_handoff_tool(agent_name="flight_assistant",description="Transfer user to the flight-booking assistant.",
)flight_assistant = create_react_agent(model="anthropic:claude-3-5-sonnet-latest",tools=[book_flight, transfer_to_hotel_assistant],prompt="You are a flight booking assistant",name="flight_assistant"
)
hotel_assistant = create_react_agent(model="anthropic:claude-3-5-sonnet-latest",tools=[book_hotel, transfer_to_flight_assistant],prompt="You are a hotel booking assistant",name="hotel_assistant"
)swarm = create_swarm(agents=[flight_assistant, hotel_assistant],default_active_agent="flight_assistant"
).compile()for chunk in swarm.stream({"messages": [{"role": "user","content": "book a flight from BOS to JFK and a stay at McKittrick Hotel"}]}
):print(chunk)print("\n")
交接(Handoffs)
多智能体交互中常见的模式是交接(handoffs),即一个智能体将控制权交给另一个智能体。交接允许你指定:
- 目标(destination):要切换到的目标智能体
- 负载(payload):传递给目标智能体的信息
这既适用于 langgraph-supervisor(主管将控制权交给各个独立智能体),也适用于 langgraph-swarm(单个智能体可以将控制权交给其他智能体)。
要在 create_react_agent
中实现交接,你需要:
-
创建一个特殊工具,用于将控制权转移给另一个智能体
def transfer_to_bob():"""转移控制权给 bob。"""return Command(# 目标智能体(节点)名称goto="bob",# 发送给目标智能体的数据update={"messages": [...]},# 告诉 LangGraph 需要导航到父图中的智能体节点graph=Command.PARENT,)
-
创建可使用交接工具的各个智能体:
flight_assistant = create_react_agent(..., tools=[book_flight, transfer_to_hotel_assistant] ) hotel_assistant = create_react_agent(..., tools=[book_hotel, transfer_to_flight_assistant] )
-
定义一个包含各个智能体节点的父图:
from langgraph.graph import StateGraph, MessagesStatemulti_agent_graph = (StateGraph(MessagesState).add_node(flight_assistant).add_node(hotel_assistant)... )
-
组合起来,你可以实现一个简单的多智能体系统,包含航班预订助手和酒店预订助手:
from typing import Annotated from langchain_core.tools import tool, InjectedToolCallId from langgraph.prebuilt import create_react_agent, InjectedState from langgraph.graph import StateGraph, START, MessagesState from langgraph.types import Commanddef create_handoff_tool(*, agent_name: str, description: str | None = None):name = f"transfer_to_{agent_name}"description = description or f"Transfer to {agent_name}"@tool(name, description=description)def handoff_tool(state: Annotated[MessagesState, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId],) -> Command:tool_message = {"role": "tool","content": f"Successfully transferred to {agent_name}","name": name,"tool_call_id": tool_call_id,}return Command(goto=agent_name,update={"messages": state["messages"] + [tool_message]},graph=Command.PARENT,)return handoff_tool# 创建交接工具 transfer_to_hotel_assistant = create_handoff_tool(agent_name="hotel_assistant",description="Transfer user to the hotel-booking assistant.", ) transfer_to_flight_assistant = create_handoff_tool(agent_name="flight_assistant",description="Transfer user to the flight-booking assistant.", )# 简单工具函数 def book_hotel(hotel_name: str):"""预订酒店"""return f"Successfully booked a stay at {hotel_name}."def book_flight(from_airport: str, to_airport: str):"""预订航班"""return f"Successfully booked a flight from {from_airport} to {to_airport}."# 定义智能体 flight_assistant = create_react_agent(model="anthropic:claude-3-5-sonnet-latest",tools=[book_flight, transfer_to_hotel_assistant],prompt="You are a flight booking assistant",name="flight_assistant" ) hotel_assistant = create_react_agent(model="anthropic:claude-3-5-sonnet-latest",tools=[book_hotel, transfer_to_flight_assistant],prompt="You are a hotel booking assistant",name="hotel_assistant" )# 定义多智能体图 multi_agent_graph = (StateGraph(MessagesState).add_node(flight_assistant).add_node(hotel_assistant).add_edge(START, "flight_assistant").compile() )# 运行多智能体图 for chunk in multi_agent_graph.stream({"messages": [{"role": "user","content": "book a flight from BOS to JFK and a stay at McKittrick Hotel"}]} ):print(chunk)print("\n")
注意
该交接实现假设:
- 每个智能体输入的是多智能体系统中的整体消息历史(包含所有智能体的对话记录)
- 每个智能体的输出消息会追加到整个多智能体系统的消息历史中
Evals(评估)&部署(Deployment) & UI
这三个功能是需要langSmith的,而langSmith属于平台服务,提供收费功能。
因此部署我们采用 langfuse + FastAPI + langGraph + SSE…,也就是如下划分:
- vllm+Model 提供模型能力
- LangGraph + FastAPI提供Agent能力
- SSE用于前后端交互
- Langfuse作为Dashboard追踪LLM回答链路
- docker提供镜像能力
- k8s提供分布式部署能力