LangGraph 深度解析(二):掌握 LangGraph 函数式 API 的状态化 AI 工作流
第一部分:函数式范式:使用 LangGraph 构建应用的新方法
本部分将介绍函数式 API 背后的核心理念,阐明为何此范式是 LangGraph 生态系统中的一项重要进展。它被定位为一种直观的、符合 Python 风格的(Pythonic)方式,用于构建复杂的状态化应用,而无需承担正式图结构的认知开销。
1.1 函数式 API 简介:Pythonic 状态管理
LangGraph 的函数式 API 旨在将该框架最强大的功能——持久化、记忆、人机协同(human-in-the-loop)和流式处理——以对现有代码最小化改动的方式集成到应用程序中 。其核心价值在于,它允许开发者使用标准的 Python 控制流结构,如 if
语句、for
循环和函数调用,来定义和编排工作流,而无需显式地定义一个有向无环图(DAG)。
这种设计解决了传统无状态应用在处理长周期、多步骤任务时的局限性,同时也规避了其他数据编排框架可能带来的复杂性。许多框架要求开发者将代码重构为显式的管道或图,这不仅增加了学习成本,也限制了逻辑表达的灵活性。函数式 API 提供了一种更为符合工程直觉的解决方案,它降低了构建持久、长时运行的 AI 代理的门槛,使开发者能够专注于业务逻辑而非框架本身 。这一战略性的转变体现了 LangChain 对开发者体验和易用性的重视。它不仅仅是一个新功能,更是对声明式、基于图的范式对于习惯了命令式编程的开发者可能构成障碍的一种认知。通过提供一种更贴近传统编程习惯的 API,LangGraph 极大地拓宽了其适用范围,让更广泛的开发者能够利用其强大的运行时来构建复杂的代理。
1.2 函数式 API 与图 API:对比分析
在选择使用 LangGraph 的 API 时,开发者面临两种选择:函数式 API(Functional API)和图 API(Graph API)。理解它们之间的差异至关重要。首先需要明确的是,这两种 API 共享相同的底层 Pregel 运行时,这意味着它们是完全可互操作的 。开发者甚至可以在同一个应用中混合使用它们。因此,选择哪种 API 并非关乎其能力的强弱,而更多地是关乎编程风格、项目需求以及开发者的偏好。
以下表格详细对比了两种 API 的关键区别:
特性 | 函数式 API | 图 API |
---|---|---|
控制流 | 命令式(使用标准 Python if, for, 函数调用) | 声明式(显式定义节点和边) |
状态管理 | 隐式,函数作用域状态。无需显式定义状态类。 | 显式,图范围的 State 类是必需的。通过 Reducer 管理状态更新。 |
检查点机制 | 任务结果被保存到入口点关联的现有检查点中。 | 每个“超步”(super-step)之后都会生成一个新的检查点。 |
可视化 | 不支持(图是动态生成的)。 | 原生支持,易于将工作流结构可视化。 |
理想用例 | 快速为现有 Python 代码添加状态化能力,处理复杂条件逻辑的工作流,偏好命令式编程风格的开发者。 | 高度结构化的多步骤流程,可视化对于调试/协作至关重要的应用,偏好声明式编程风格的开发者。 |
这种对比清晰地揭示了两种 API 的设计哲学。图 API 提供了一种结构化的、可预见的方式来构建工作流,其可视化能力在调试复杂流程和团队协作时尤为宝贵 。然而,一些开发者反馈认为,图 API 的语法可能显得“笨拙”,并且会使程序变得“臃肿” 。
相比之下,函数式 API 允许开发者利用他们已有的 Python 知识来构建工作流,控制流逻辑(如复杂的条件分支和循环)可以直接用 Python 代码表达,这通常比定义复杂的条件边更为直观和简洁 。状态管理也变得更加隐式和自动化,开发者无需手动定义 State 类和 reducer 函数,因为状态的作用域被限定在函数内部 。检查点机制的细微差别也值得注意:函数式 API 将任务结果更新到当前检查点,而图 API 则为每个步骤创建新的快照,这反映了两种范式在状态演进模型上的不同处理方式 。
1.3 核心原语:@entrypoint 和 @task
函数式 API 的强大功能建立在两个核心的装饰器原语之上:@entrypoint
和 @task
。
@entrypoint:工作流的编排者
@entrypoint
装饰器用于标记一个函数,使其成为工作流的起点和逻辑容器 。它封装了整个工作流的主要逻辑,管理着执行流程。当与检查点(checkpointer)结合使用时,@entrypoint
便解锁了 LangGraph 所有的状态化功能,包括持久化和人机协同。从技术上讲,使用 @entrypoint
装饰一个函数会生成一个 Pregel 实例,这是一个可执行的图对象,可以通过 .invoke()
或 .stream()
等方法来运行 。
@task:工作的原子单元
@task
装饰器则用于标记一个函数,使其成为一个离散的、可被检查点记录的工作单元。这通常用于封装那些需要持久化结果的操作,例如调用外部 API、执行数据处理步骤或与大型语言模型(LLM)交互 。
@task
具有两个关键特性:
- 异步执行:调用一个被
@task
装饰的函数会立即返回一个类似 future 的对象,而不是阻塞等待结果。这使得并发执行多个任务成为可能,极大地提高了 I/O 密集型工作流的效率 。 - 检查点记录:
@task
的执行结果会被自动保存到检查点中。这意味着如果工作流在执行过程中中断(例如,由于错误或人为干预),在恢复时已经完成的任务无需重新运行,可以直接从检查点加载其结果,从而实现了容错和可恢复性 。
序列化要求
一个至关重要的技术要求是,@entrypoint
的输入和输出,以及 @task
的输出,都必须是 JSON 可序列化的。这是检查点机制能够正确工作的前提,因为 LangGraph 需要将工作流的状态持久化到存储中(如内存、SQLite 或 Postgres)。任何不可序列化的对象(如自定义类的实例、数据库连接等)都会导致运行时错误 。开发者在设计工作流时必须始终牢记这一约束。
第二部分:构建与编排工作流
本部分将从理论转向实践,从头开始构建工作流,演示如何组织代码、处理输入、并行执行任务,并通过组合不同的图和入口点来构建更大型的系统。
2.1 你的第一个函数式工作流:一个实践示例
让我们从一个简单的例子开始,以展示函数式 API 的基本结构和用法。我们将创建一个工作流,用于判断一个数字是奇数还是偶数,并格式化输出消息。
首先,需要定义执行具体逻辑的 @task
。在这个例子中,我们定义了两个任务:is_even
用于判断数字的奇偶性,format_message
用于构建最终的输出字符串。
from langgraph.func import task, entrypoint
from langgraph.checkpoint.memory import InMemorySaver# 定义任务
@task
def is_even(num: int) -> str:return "even" if num % 2 == 0 else "odd"@task
def format_message(num: int, parity: str) -> str:return f"The number {num} is {parity}."# 定义入口点
@entrypoint(checkpointer=InMemorySaver())
def workflow(num: int):# 调用任务,返回 future 对象parity_future = is_even(num)# 调用另一个任务,并等待第一个任务的结果#.result() 会阻塞直到 future 完成message_future = format_message(num, parity_future.result())return message_future.result()# 调用工作流
# 使用.invoke() 运行并获取最终结果
result = workflow.invoke(4)
print(result)
# 输出: The number 4 is even.
在这个例子中,workflow
函数被 @entrypoint
装饰,成为工作流的入口。在函数内部,我们按顺序调用了 is_even
和 format_message
任务。调用一个 @task
会立即返回一个 future 对象。为了获取任务的实际结果,我们调用了 .result()
方法,这个方法会阻塞执行,直到任务完成并返回其结果 。
处理多个输入
@entrypoint
装饰的函数只能接受一个位置参数作为输入。然而,在实际应用中,工作流通常需要多个输入参数。一个常见的模式是将所有输入打包成一个字典,并将其作为唯一的参数传递给入口点函数 。
@entrypoint(checkpointer=InMemorySaver())
def my_workflow(inputs: dict):num1 = inputs["number1"]num2 = inputs["number2"]#... 工作流逻辑...return {"result": num1 + num2}# 调用时传入一个字典
result = my_workflow.invoke({"number1": 10, "number2": 20})
print(result)
# 输出: {'result': 30}
2.2 通过并行执行解锁性能
函数式 API 的一个强大之处在于其对并发执行的天然支持。对于 I/O 密集型操作,例如同时调用多个 LLM 或外部 API,顺序执行会非常低效。由于 @task
调用会立即返回一个 future 对象,我们可以轻松地并行启动多个任务,然后等待它们全部完成 。
让我们看一个例子:并行生成一篇文章的多个段落。
import time
from langchain_openai import ChatOpenAI# 假设已设置 OpenAI API 密钥
model = ChatOpenAI(model="gpt-3.5-turbo")@task
def generate_paragraph(topic: str, section: str) -> str:print(f"Generating paragraph for section: {section}...")# 模拟 API 调用延迟time.sleep(1)response = model.invoke(f"Write a paragraph about {section} for an essay on {topic}.")print(f"Finished paragraph for section: {section}.")return response.content@entrypoint(checkpointer=InMemorySaver())
def compose_essay(topic: str, sections: list[str]):# 使用列表推导式并行启动所有任务paragraph_futures = [generate_paragraph(topic, sec) for sec in sections]# 等待所有 future 完成并收集结果paragraphs = [future.result() for future in paragraph_futures]return "\n\n".join(paragraphs)# 调用工作流
essay = compose_essay.invoke("The History of Artificial Intelligence",["Early Foundations", "The AI Winter", "Modern Breakthroughs"]
)
print(essay)
在这个例子中,generate_paragraph
任务在一个列表推导式中被多次调用,这会几乎同时启动所有段落的生成过程。然后,我们再次遍历 future 列表,通过调用 .result()
来收集结果。由于任务是并行执行的,总的执行时间将约等于最慢的那个任务的执行时间,而不是所有任务执行时间的总和 。这种设计并非偶然,而是 LangGraph 刻意利用了 Python 中 concurrent.futures
的熟悉模式。这使得并行化感觉像是语言的原生功能,而不是框架强加的复杂特性,从而降低了学习曲线,并鼓励开发者为 I/O 密集型工作流设计高效的执行路径。
2.3 高级组合:集成图与子工作流
函数式 API 的设计具有高度的可组合性,允许开发者构建模块化的、可重用的工作流组件。
调用其他入口点
一个 @entrypoint
工作流可以像调用普通函数一样调用另一个 @entrypoint
工作流。一个关键的特性是,当一个父工作流调用一个子工作流时,父工作流的检查点(checkpointer)会自动传递给子工作流。这确保了整个调用链共享一个统一的状态历史,所有任务的执行结果都会被记录在同一个线程的检查点中 。
@entrypoint(checkpointer=InMemorySaver())
def multiply(a: int, b: int) -> int:return a * b@entrypoint(checkpointer=InMemorySaver())
def main_workflow(x: int, y: int, z: int):# 调用子工作流product_future = multiply.invoke(x, y)#.invoke() 在这里返回一个 future,因为它是从另一个 entrypoint 内部调用的return product_future.result() + zresult = main_workflow.invoke({"x": 3, "y": 4, "z": 5},config={"configurable": {"thread_id": "thread-1"}}
)
print(result) # 输出: 17
调用 StateGraph 实例
函数式 API 和图 API 之间的互操作性是 LangGraph 架构统一性的体现。一个函数式 @entrypoint
可以无缝地调用一个预先编译好的 StateGraph 实例。这为开发者提供了极大的灵活性,他们可以根据不同模块的需求选择最合适的 API。例如,可以使用图 API 来构建一个结构清晰、易于可视化的多步骤代理,然后在一个更动态、逻辑更复杂的函数式工作流中调用这个代理 。
from typing import TypedDict
from langgraph.graph import StateGraph, START, END# 使用图 API 定义一个 StateGraph
class MyGraphState(TypedDict):value: intdef add_five(state: MyGraphState) -> MyGraphState:return {"value": state["value"] + 5}builder = StateGraph(MyGraphState)
builder.add_node("add_five", add_five)
builder.add_edge(START, "add_five")
builder.add_edge("add_five", END)
my_graph = builder.compile()# 在函数式 API 中调用这个图
@entrypoint(checkpointer=InMemorySaver())
def functional_workflow(initial_value: int):# 调用 StateGraph 实例graph_result = my_graph.invoke({"value": initial_value})return graph_result["value"]final_result = functional_workflow.invoke(10)
print(final_result) # 输出: 15
这种混合搭配的能力使团队能够利用两种范式的优点,为应用程序的不同部分选择最合适的工具,从而构建出既健壮又灵活的复杂系统。
第三部分:构建持久和弹性的应用
本部分将重点介绍使函数式 API 应用变得健壮和生产就绪的功能。我们将深入探讨检查点机制的原理,以及它如何实现错误恢复和性能优化。
3.1 状态的基础:检查点机制详解
检查点(Checkpointing)是 LangGraph 中实现所有状态化功能的核心机制,是理解其持久化、记忆和人机协同等高级特性的基石。
什么是检查点程序?
检查点程序(Checkpointer)是一个遵循 BaseCheckpointSaver
接口的对象,它是持久化的引擎 。当一个 @entrypoint
在定义时配置了检查点程序,该工作流的每一次状态变化都会被捕获并保存。这些保存的状态快照就是检查点。在我们的示例中,为了简单起见,我们一直使用 InMemorySaver
,它将检查点保存在内存中。但在生产环境中,通常会使用更持久的后端,如 SqliteSaver
(用于本地工作流)或 PostgresSaver
(用于生产级应用)。
线程与检查点
在 LangGraph 的持久化模型中,有两个核心概念:
- 线程(Thread):一个线程是与一个唯一 ID(thread_id)相关联的一系列运行记录。它代表了一个完整的、可能跨越多次调用的交互历史。例如,一个与用户的完整对话可以被视为一个线程 。
- 检查点(Checkpoint):一个检查点是在特定时间点,线程状态的一个快照。在函数式 API 中,当一个入口点被调用时,会创建一个初始检查点;之后,每个
@task
完成时,其结果都会被用来更新这个线程的当前检查点 。
这个检查点机制是 LangGraph 几乎所有高级功能的统一抽象。错误恢复、缓存、记忆和人机协同并非各自独立的功能,而是都建立在强大的持久化层之上的衍生能力。因此,理解检查点不仅是学习一个功能,而是掌握解锁 LangGraph 整个状态化架构的钥匙。
3.2 实现错误恢复与重试
检查点的最直接好处之一就是实现了工作流的容错性。
从错误中恢复
想象一个长时间运行的工作流,其中包含多个耗时的任务。如果其中一个任务失败,整个工作流就会中断。没有检查点,我们就必须从头开始重新运行所有任务。而有了检查点,情况就完全不同了。
让我们通过一个例子来演示。工作流中有两个任务,一个快一个慢,第二个任务会故意失败。
import time
import random@task
def slow_successful_task():print("Starting slow task...")time.sleep(3)print("Slow task finished.")return "Success"@task
def sometimes_fail_task():print("Starting potentially failing task...")if random.random() > 0.5:print("Task failed!")raise ValueError("Something went wrong")print("Task succeeded!")return "OK"@entrypoint(checkpointer=InMemorySaver())
def resilient_workflow():slow_future = slow_successful_task()slow_future.result() # 等待慢任务完成fail_future = sometimes_fail_task()fail_future.result()# 运行工作流,并指定一个线程 ID
config = {"configurable": {"thread_id": "resilience-test-1"}}
try:print("First attempt:")resilient_workflow.invoke(None, config)
except ValueError as e:print(f"Caught an error: {e}")print("\nResuming workflow...")# 使用相同的线程 ID 再次调用,以恢复执行resilient_workflow.invoke(None, config)
当第一次运行此工作流时,slow_successful_task
会成功执行并完成,其结果被记录到检查点中。当 sometimes_fail_task
失败时,工作流会抛出异常。然后,我们捕获这个异常,并使用完全相同的 config
(包含相同的 thread_id
)再次调用工作流。LangGraph 的运行时会检测到这个线程已经有一个检查点,它会加载该检查点,发现 slow_successful_task
已经成功完成,因此会跳过它的执行,直接从失败的 sometimes_fail_task
开始重试 。这对于构建可靠的长时运行代理至关重要。
配置 RetryPolicy
对于那些可能由于瞬时问题(如网络抖动)而失败的任务,我们可以配置一个重试策略,让 LangGraph 自动处理重试。这可以通过在 @task
装饰器上设置 retry_policy
参数来实现。
from langgraph.types import RetryPolicy@task(retry_policy=RetryPolicy(delay_s=1, # 重试间隔max_attempts=3, # 最大尝试次数errors=[ValueError] # 只对特定类型的错误进行重试)
)
def get_info_with_retry():print("Attempting to get info...")if random.random() > 0.3:raise ValueError("Network error")return "Data fetched"@entrypoint(checkpointer=InMemorySaver())
def workflow_with_retry():return get_info_with_retry().result()workflow_with_retry.invoke(None)
在这个例子中,get_info_with_retry
任务在遇到 ValueError
时会自动重试,最多尝试 3 次,每次间隔 1 秒 。这是一种处理瞬时故障的优雅方式,无需在工作流逻辑中编写复杂的重试代码。
3.3 使用缓存进行优化
对于那些计算成本高昂或耗时但结果确定的任务(即对于相同的输入总是产生相同的输出),我们可以使用缓存来避免不必要的重复执行。
CachePolicy
通过在 @task
装饰器上配置 CachePolicy
,并为工作流的 .compile()
方法提供一个缓存实例(如 InMemoryCache
),我们可以启用任务级别的缓存 。
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy@task(cache_policy=CachePolicy(ttl=120)) # 缓存结果 120 秒
def slow_add(a: int, b: int) -> int:print(f"Performing slow addition for {a} + {b}...")time.sleep(2)return a + b# 注意:为了使用缓存,我们需要手动编译入口点
@entrypoint(checkpointer=InMemorySaver())
def caching_workflow(x: int, y: int):# 编译时传入缓存实例graph = caching_workflow.compile(cache=InMemoryCache())print("First call:")start_time = time.time()result1 = graph.invoke({"x": x, "y": y})print(f"Result: {result1}, Time: {time.time() - start_time:.2f}s")print("\nSecond call (should be cached):")start_time = time.time()result2 = graph.invoke({"x": x, "y": y})print(f"Result: {result2}, Time: {time.time() - start_time:.2f}s")caching_workflow.invoke({"x": 10, "y": 5})
当第一次调用 caching_workflow
时,slow_add
任务会正常执行,耗时约 2 秒,其结果 (10, 5) -> 15
会被存入缓存。当第二次使用相同的输入再次调用时,LangGraph 会直接从缓存中返回结果,执行时间几乎为零 。这对于优化那些重复调用确定性、高成本函数的应用(例如,对相同文本进行嵌入计算)非常有效。
第四部分:高级状态管理与记忆
本部分将探讨如何构建能够记忆信息的应用程序,无论是在一个单一的对话中,还是跨越多个交互的长周期。
4.1 用于对话式 AI 的短期记忆
短期记忆,也称为线程级持久化,是构建多轮对话体验的基础。它的作用域被限定在一个 thread_id
内,用于追踪正在进行的对话状态 。
previous 参数
函数式 API 提供了一个强大的注入机制来实现短期记忆:previous
参数。当你在 @entrypoint
装饰的函数签名中声明一个名为 previous
的参数时,LangGraph 运行时会自动将该线程上一次调用保存到检查点的状态注入到这个参数中。如果这是该线程的第一次调用,previous
的值将为 None
。
构建一个聊天机器人
让我们构建一个完整的聊天机器人示例来演示这一概念。这个机器人将能够记住之前的对话历史。
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI# 假设模型已初始化
model = ChatOpenAI(model="gpt-4o-mini")@task
def call_llm(messages: list):return model.invoke(messages)# 注意'previous'参数
@entrypoint(checkpointer=InMemorySaver())
def chatbot(user_message: str, previous: list | None = None):# 如果是第一次对话,初始化消息列表messages = previous or []# 添加当前用户消息messages.append(HumanMessage(content=user_message))# 调用 LLMai_response = call_llm(messages).result()# 添加 AI 的回应messages.append(ai_response)# 返回完整的消息历史,它将被保存为下一次的'previous'return messages# 进行一次对话
config = {"configurable": {"thread_id": "chatbot-thread-1"}}# 第一次交互
final_state_1 = chatbot.invoke("My name is John.", config=config)
print(final_state_1[-1].content) # 打印 AI 的回应# 第二次交互 (使用相同的 thread_id)
final_state_2 = chatbot.invoke("What is my name?", config=config)
print(final_state_2[-1].content) # AI 应该能回答 "Your name is John."
在这个例子中,chatbot
入口点接收 previous
消息列表。它将新的用户消息追加到列表中,调用 LLM,然后将 AI 的回应也追加进去。最后,它返回更新后的完整消息列表。由于配置了检查点程序,这个返回的列表会被保存下来。在下一次使用相同 thread_id
调用时,这个保存的列表就会作为 previous
参数被注入,从而实现了对话记忆 。
4.2 使用 entrypoint.final 进行精细的状态控制
有时,我们希望工作流返回给调用者的值与保存到检查点的状态有所不同。例如,在聊天机器人中,我们可能只想将最新的 AI 消息返回给用户,但需要将整个对话历史保存到检查点中以供下一轮使用。entrypoint.final
原语就是为此而设计的 。
entrypoint.final
接受两个参数:
value
: 将返回给调用者的值。save
: 将被保存到检查点、并在下一轮作为previous
注入的值。
让我们重构之前的聊天机器人来使用它:
from langgraph.func import entrypoint
from langchain_core.messages import AIMessage@entrypoint(checkpointer=InMemorySaver())
def improved_chatbot(user_message: str, previous: list | None = None):messages = previous or []messages.append(HumanMessage(content=user_message))ai_response = call_llm(messages).result()messages.append(ai_response)# 返回最新的 AI 消息,但保存完整的历史return entrypoint.final(value=ai_response.content, save=messages)config = {"configurable": {"thread_id": "chatbot-thread-2"}}# 调用时,返回的直接是字符串
ai_reply = improved_chatbot.invoke("I'm planning a trip to Paris.", config=config)
print(f"AI: {ai_reply}")# 下一轮调用
ai_reply_2 = improved_chatbot.invoke("What's the weather like there?", config=config)
print(f"AI: {ai_reply_2}") # AI 应该知道 "there" 指的是巴黎
通过使用 entrypoint.final
,我们改进了工作流的外部 API,使其更加简洁,同时在内部保持了完整的状态,实现了关注点分离。
4.3 长期记忆:跨会话持久化知识
短期记忆局限于单个线程(会话),而长期记忆则允许信息在不同线程之间共享。这通常用于存储特定于用户的信息(如姓名、偏好),以便在未来的任何对话中都能被访问 。LangGraph 的记忆模型优雅地反映了人类认知的区别:“短期记忆”(线程中的 previous
状态)类似于处理当前任务的工作记忆,而“长期记忆”(Store)则好比我们关于人和事实的持久知识库。这种概念上的一致性使得系统设计更加直观。
Store 接口
长期记忆是通过 Store
接口实现的。我们需要创建一个 Store
实例(例如 InMemoryStore
),并将其传递给 @entrypoint
装饰器。然后,store
对象就可以像 previous
一样,作为一个可注入的参数在入口点函数中使用 。
实现示例
让我们构建一个能够记住用户姓名并在新对话中使用的代理。
from langgraph.store.memory import InMemoryStore, BaseStore
import uuid# 创建一个 Store 实例
store = InMemoryStore()@task
def save_name(store: BaseStore, user_id: str, name: str):# 将姓名存入与 user_id 关联的命名空间store.put([(("user_names", user_id), "name")], {"name": name})@task
def get_name(store: BaseStore, user_id: str) -> str | None:# 从 store 中检索姓名items = store.get([("user_names", user_id)])return items.get("name") if items and items else None# 将 store 实例传递给 entrypoint
@entrypoint(checkpointer=InMemorySaver(), store=store)
def memory_agent(query: str, user_id: str, store: BaseStore, # 注入 storeprevious: list | None = None
):messages = previous or []messages.append(HumanMessage(content=query))# 检查是否是记忆指令if "my name is" in query.lower():name = query.split("my name is")[-1].strip()save_name(store, user_id, name).result()response = f"Nice to meet you, {name}! I'll remember that."else:# 尝试获取已存的姓名name = get_name(store, user_id).result()if "what is my name" in query.lower():response = f"Your name is {name}." if name else "I don't know your name yet."else:# 普通对话response = "How can I help you today?"if name:response += f" By the way, hello {name}!"messages.append(AIMessage(content=response))return entrypoint.final(value=response, save=messages)# 用户 1 的交互
user_1_config_1 = {"configurable": {"thread_id": "thread-user1-a", "user_id": "user-1"}}
user_1_config_2 = {"configurable": {"thread_id": "thread-user1-b", "user_id": "user-1"}}# 在第一个会话中告诉代理名字
print(memory_agent.invoke({"query": "Hi, my name is Alice.", "user_id": "user-1"}, config=user_1_config_1))# 在一个全新的会话中提问
print(memory_agent.invoke({"query": "What is my name?", "user_id": "user-1"}, config=user_1_config_2))
# 输出应为 "Your name is Alice."
在这个例子中,我们使用 user_id
来为存储中的信息创建命名空间。当 Alice 在第一个会话(thread-id-a)中介绍自己时,她的名字被保存到与 user-1
关联的 Store
中。之后,当她在第二个、完全独立的会话(thread-id-b)中提问时,代理能够从 Store
中检索到她的名字,并正确回答 。这展示了如何使用 Store
来构建具有跨会话持久记忆的个性化 AI 应用。
第五部分:交互式与代理模式
这是本教程的顶点部分,我们将结合之前的所有概念,构建真正动态和交互式的代理。
5.1 启用人机协同(HITL)工作流
人机协同(Human-in-the-loop, HITL)允许在工作流的关键节点暂停,以征求人类的审查、批准或输入。这是在 LLM 应用中确保安全性和准确性的重要机制。
使用 interrupt 暂停
通过在任务中调用 langgraph.func.interrupt()
函数,我们可以无限期地暂停工作流。这会创建一个特殊的“中断”检查点,等待外部干预 。
使用 Command 恢复
当工作流被中断后,用户可以通过使用相同的 thread_id
再次调用入口点来恢复它。此时,传递给 .invoke()
的值将被 langgraph.types.Command
原语接收,从而将人类的输入注入回工作流中 。
常见模式:审查工具调用
一个高级且实用的 HITL 模式是在执行工具调用前让用户进行审查。代理首先调用 LLM 生成工具调用请求,然后暂停,等待用户批准、拒绝或修改该请求。
from langgraph.func import interrupt
from langgraph.types import Command
from langchain_core.tools import tool
from langchain_core.messages import ToolMessage@tool
def search_web(query: str):"""Searches the web for a query."""return f"Search results for '{query}':..."tools = [search_web]
model_with_tools = model.bind_tools(tools)@task
def get_tool_calls(messages: list):return model_with_tools.invoke(messages).tool_calls@task
def execute_tool(tool_call: dict):tool_to_call = {t.name: t for t in tools}[tool_call["name"]]return ToolMessage(content=str(tool_to_call.invoke(tool_call["args"])),tool_call_id=tool_call["id"])@entrypoint(checkpointer=InMemorySaver())
def agent_with_review(query: str):messages = [HumanMessage(content=query)]while True:tool_calls = get_tool_calls(messages).result()if not tool_calls:# 如果没有工具调用,就结束breakprint("LLM wants to call the following tools:")for tc in tool_calls:print(f"- {tc['name']}({tc['args']})")# 暂停以等待人类反馈interrupt()# 接收人类的决定feedback = yieldif isinstance(feedback, Command) and feedback.resume == "approve":print("Tool calls approved. Executing...")tool_results = [execute_tool(tc) for tc in tool_calls]messages.extend([r.result() for r in tool_results])else:print("Tool calls rejected.")breakfinal_response = model.invoke(messages)return final_response.content# 运行并审查
config = {"configurable": {"thread_id": "review-agent-1"}}
graph = agent_with_review.compile()# 启动工作流
for chunk in graph.stream("What's the weather in SF?", config=config):print(chunk)# 工作流会暂停,等待输入。我们可以这样恢复它:
graph.invoke(Command(resume="approve"), config=config)
这个例子展示了一个完整的审查循环。工作流在生成工具调用后暂停,打印出计划执行的操作。然后,外部用户可以通过调用 .invoke(Command(resume="approve"),...)
来批准执行,或者发送其他指令来否决它 。
5.2 通过流式处理提供实时反馈
为了提供更具响应性的用户体验,LangGraph 支持流式传输工作流的实时更新。函数式 API 使用与图 API 相同的流式处理机制 。
流式处理模式
通过调用 .stream()
而不是 .invoke()
,我们可以获得一个事件流。流式处理有多种模式,例如 updates
(返回状态的增量更新)、messages
(专门用于流式传输 LLM token)和 custom
(用于自定义事件)。
实现示例
我们可以在任务内部使用 langgraph.config.get_stream_writer()
来发出自定义的进度消息。
from langgraph.config import get_stream_writer@task
def long_process():writer = get_stream_writer()writer.emit({"status": "Started processing..."})time.sleep(1)writer.emit({"status": "50% complete..."})time.sleep(1)writer.emit({"status": "Process finished."})return "Done"@entrypoint(checkpointer=InMemorySaver())
def streaming_workflow():return long_process().result()# 使用 stream() 并指定 stream_mode
for chunk in streaming_workflow.stream(None, config={"configurable": {"thread_id": "stream-1"}},stream_mode=["custom", "updates"]
):print(chunk)
运行此代码将实时打印出 long_process
任务发出的自定义状态消息,让用户能够实时了解工作流的进展情况 。
5.3 从零开始:构建一个 ReAct 代理
现在,我们将所有学到的知识整合在一起,从头开始构建一个完整的 ReAct(Reasoning and Acting)代理。这个代理将能够通过推理来决定何时以及如何使用工具来回答问题 。
ReAct 代理的核心循环如下:
- Reason:LLM 接收用户问题和历史记录,进行推理,并决定是回答问题还是使用工具。
- Act:如果决定使用工具,LLM 会生成工具调用。
- Observe:我们执行工具调用,并将结果作为观察(Observation)返回给 LLM。
重复此过程,直到 LLM 能够回答最初的问题。
# (复用 5.1 中的 tool, model_with_tools, execute_tool)@entrypoint(checkpointer=InMemorySaver())
def react_agent(query: str, previous: list | None = None):messages = previous or [HumanMessage(content=query)]while True:# 1. Reason: LLM 决定下一步行动response = model_with_tools.invoke(messages)messages.append(response)if not response.tool_calls:# 如果没有工具调用,说明 LLM 认为可以回答了break# 2. Act: 执行工具调用tool_futures = [execute_tool(tc) for tc in response.tool_calls]# 3. Observe: 收集工具结果tool_results = [f.result() for f in tool_futures]messages.extend(tool_results)# 返回最终的 AI 回应,并保存完整历史final_answer = next(m.content for m in reversed(messages) if isinstance(m, AIMessage) and not m.tool_calls)return entrypoint.final(value=final_answer, save=messages)# 测试 ReAct 代理
config = {"configurable": {"thread_id": "react-agent-1"}}
response = react_agent.invoke("What can you tell me about LangGraph?", config=config)
print(response)# 进行追问
response2 = react_agent.invoke("How does it compare to standard LangChain?", config=config)
print(response2)
这个 react_agent
实现了完整的 ReAct 循环,并且通过 previous
参数和 entrypoint.final
实现了对话记忆,使其能够处理多轮、有上下文的查询 。
5.4 规模化:构建多代理网络
函数式 API 非常适合用于编排多个独立的代理,构建复杂的多代理系统。我们可以将每个代理定义为一个 @task
,然后使用一个主 @entrypoint
作为“主管”或“路由器”来协调它们之间的工作 。
代理切换
主管入口点根据当前状态或前一个代理的输出来决定接下来调用哪个代理任务。一种常见的模式是使用特殊的工具来显式地请求将控制权移交给另一个代理。这些工具通常使用 @tool(return_direct=True)
来装饰,以便在被调用时立即中断当前代理的执行循环,将控制权交还给主管 。
这种设计模式在函数式 API 中尤其优雅,因为主管的路由逻辑可以用清晰、自然的 Python 代码(如 if/elif/else
结构)来表达。相比于图 API 中需要定义复杂的条件边,这种方式使得编排逻辑更易于编写、调试和维护。它将多代理系统中最复杂的部分——协调逻辑——直接映射到了开发者熟悉的编程结构上。
# 伪代码示例
@task
def research_agent(topic: str) -> dict:#... 执行研究...if "financial data" in result:return {"next_agent": "finance_agent", "data": result}return {"next_agent": "writer_agent", "data": result}@task
def finance_agent(data: dict) -> dict:#... 分析财务数据...return {"next_agent": "writer_agent", "data": processed_data}@task
def writer_agent(data: dict) -> str:#... 撰写报告...return final_report@entrypoint(checkpointer=InMemorySaver())
def supervisor(initial_topic: str):next_agent = "research_agent"data = {"topic": initial_topic}while next_agent!= "end":if next_agent == "research_agent":result = research_agent(data["topic"]).result()elif next_agent == "finance_agent":result = finance_agent(data).result()elif next_agent == "writer_agent":return writer_agent(data).result()next_agent = result.get("next_agent", "end")data = result.get("data")
这个伪代码展示了一个主管如何根据前一个代理的输出来动态地路由任务,从而实现一个协作式的多代理工作流。
第六部分:指导原则与最佳实践
本最后一部分将提炼出使用函数式 API 构建可靠应用所需的关键架构原则。
6.1 确保可靠性:确定性与幂等性
为何重要
为了让可恢复的功能(如 HITL 和错误恢复)正常工作,工作流在 @task
之外的部分必须是确定性的。这意味着,如果工作流从一个检查点恢复执行,它必须能够沿着完全相同的路径到达中断点。任何随机性或不确定性都可能导致恢复失败或行为异常 。
封装随机性
任何非确定性的操作,包括 LLM 调用、外部 API 调用、获取当前时间(datetime.now()
)或生成随机数,都必须被封装在 @task
内部。这样做可以确保这些操作的结果被记录在检查点中。当工作流恢复时,它将使用检查点中记录的确定性结果,而不是重新执行不确定的操作 。
正确做法:
@task
def get_random_number():return random.randint(1, 100)@entrypoint(...)
def my_workflow():num = get_random_number().result()if num > 50:...
错误做法:
@entrypoint(...)
def my_workflow():num = random.randint(1, 100) # 错误!非确定性操作在 entrypoint 中if num > 50:...
幂等性
在设计 @task
时,应尽可能使其具有幂等性。幂等性意味着使用相同的输入多次调用一个任务会产生相同的结果,并且不会产生意外的副作用。例如,一个向数据库插入记录的任务如果不是幂等的,在因网络超时而重试时可能会插入重复的记录。
6.2 常见陷阱与故障排除
- 处理副作用:避免在
@task
之外执行有副作用的操作(例如,写入文件、发送电子邮件、更新数据库)。因为工作流恢复可能会导致这些副作用被意外地执行多次 。应将所有副作用操作封装在幂等的@task
中。 - 序列化错误:这是最常见的错误来源之一。再次强调,所有传入/传出
@entrypoint
和传出@task
的数据都必须是 JSON 可序列化的。自定义对象实例、函数或其他不可序列化的类型都会导致失败。 - 忘记 thread_id:所有状态化功能(记忆、恢复、HITL)都依赖于在每次调用时传递一个一致的
config={"configurable": {"thread_id": "..."}}
。如果忘记传递或每次都使用新的 ID,工作流将表现为无状态的。
结论:Pythonic AI 开发的未来
LangGraph 的函数式 API 代表了构建复杂 AI 系统方法论的一次重要演进。它凭借其符合工程直觉的设计、强大的功能和高度的灵活性,极大地降低了开发者构建状态化、长时运行代理的门槛。
通过将持久化、记忆、人机协同等高级功能无缝集成到标准的 Python 编程范式中,函数式 API 使开发者能够专注于业务逻辑创新,而无需深陷于复杂的图定义和状态管理。它证明了复杂的代理模式可以变得比以往任何时候都更加易于实现。无论是构建简单的对话机器人,还是编排复杂的多代理网络,函数式 API 都为开发者提供了一套强大而优雅的工具,让他们能够舒适地使用熟悉的 Python 语言来驾驭未来 AI 应用的复杂性。