LangGraph 历史追溯 人机协同(Human-in-the-loop,HITL)
本文用 get_state_history
、update_state
、interrupt
把工作流做成“可回退、可修补、可人工兜底”,通过完整可运行示例将常见坑位一网打尽:历史追溯(get_state_history)→ 从历史点继续(time-travel)→ 分叉管理(checkpoint / namespace)→ 历史篡改(update_state + values + as_node)→ 人机(interrupt + Command(resume))→ 调试与错误修复。
1、三个核心概念
- State(全局状态):节点间累计合并的字典。每个节点返回的片段会 merge 进全局 state。
- Checkpoint(检查点):每次写状态后持久化的“快照”;通过它实现历史追溯/恢复。在 LangGraph 里,触发检查点的时机是在节点执行完成、返回更新值之后。此时,LangGraph 会把当时的 state(节点执行完后的值) 存入 checkpointer(这里是 InMemorySaver)。也就是说,检查点记录的状态,代表了“某个节点执行完毕后,图在这个位置的完整上下文”。
- 历史与分叉:
get_state_history(config)
返回的历史快照按最新优先排序,并以迭代器形式提供。由于迭代器在遍历一次后将无法再次迭代,建议先使用list()
将其内容完整保存,再进行后续处理。- 从过去的 checkpoint 继续 或 update_state 都会生成新分支;旧时间线不被改写。
- 恢复/更新时传三件套最稳:
thread_id + checkpoint_id + checkpoint_ns
。
2、最小可运行示例(历史追溯 + HITL)
2.1 图结构与节点
step1
:写入{"step":1,"msg":"Hello"}
review
:interrupt 暂停等待人工决策step3
:根据用户给出的approved
给出最终结果
# -*- coding: utf-8 -*-
"""
LangGraph: time-travel + interrupt(HITL) 示例
- 节点: step1 -> review(人为确认, interrupt) -> step3
- 首次执行生成历史
- 回溯到 step1,可选修改 msg
- 从回溯点继续:先命中 review 的 interrupt 暂停
- 人工决策:再次 invoke 传入 True/False 继续
"""
from typing import Optional
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command # 关键# ---------- 1) 定义 State 和节点 ----------
class State(TypedDict):step: Optional[int]msg: Optional[str]approved: Optional[bool]def step1(state: State):print("[step1] input:", state)new_state = {"step": 1, "msg": "Hello"}print("[step1] output:", {**state, **new_state})return new_statedef review(state: State):"""Human-in-Loop:让人确认当前 msg,返回的布尔值作为审批结果。"""print("[review] input:", state)payload = {"question": "Approve current message?","current_msg": state.get("msg"),"hint": "Return True to approve, False to reject."}print("[review] interrupt payload:", payload) # ← 手动打印decision = interrupt(payload)# 这里的 decision 就是断点恢复时传给 graph.invoke(...) 的值new_state = {"approved": bool(decision), "step": 2}print("[review] output (pending human):", {**state, **new_state})return new_statedef step3(state: State):print("[step3] input:", state)suffix = "✅ Approved" if state.get("approved") else "❌ Rejected"new_state = {"step": 3, "msg": f"{state.get('msg') or ''} ::: {suffix}"}print("[step3] output:", {**state, **new_state})return new_state# ---------- 2) 构建图 ----------
builder = StateGraph(State)
builder.add_node("step1", step1)
builder.add_node("review", review)
builder.add_node("step3", step3)
builder.add_edge(START, "step1")
builder.add_edge("step1", "review")
builder.add_edge("review", "step3")
builder.add_edge("step3", END)# ---------- 3) 编译(启用 checkpoint) ----------
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)thread_id = "human-in-loop"
base_config = {"configurable": {"thread_id": thread_id}}print("\n=== [1] 首次完整执行(会在 review 处被 interrupt 暂停) ===")
try:# 第一次跑:会停在 review 的 interrupt 处并结束本次调用(不报错)graph.invoke({}, base_config)
except Exception as e:# 绝大多数情况下不会抛异常;这里只是保险打印print("First run exception:", repr(e))# 看历史(最新在最前)
history = list(graph.get_state_history(base_config))
print("\n=== 历史快照(最新在最前) ===")
for i, snap in enumerate(history):ck = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck={ck}, values={snap.values}")# ---------- 4) 回溯到 step1(可选:修改),然后继续到 review(再次 interrupt) ----------
# 找到一个 step==1 的检查点
resume_from = next((s for s in history if s.values.get("step") == 1), None)
assert resume_from, "没有找到 step==1 的检查点"resume_ck_id = resume_from.config["configurable"]["checkpoint_id"]
resume_ck_ns = resume_from.config["configurable"]["checkpoint_ns"]# (可选)修改回溯点的 msg
print("\n=== [2] 回溯到 step1 并修改 msg='Hi'(新分支) ===")
graph.update_state({"configurable": {"thread_id": thread_id,"checkpoint_id": resume_ck_id,"checkpoint_ns": resume_ck_ns,}},values={"msg": "Hi"},as_node="step1", # 明确来源节点,避免下一步歧义
)# 重新定位到“被修改后的 step1”
history2 = list(graph.get_state_history(base_config))
new_step1 = next((s for s in history2 if s.values.get("step") == 1 and s.values.get("msg") == "Hi"),None
)
assert new_step1, "没有找到被修改后的 step1"
new_ck_id = new_step1.config["configurable"]["checkpoint_id"]
new_ck_ns = new_step1.config["configurable"]["checkpoint_ns"]# 从新 step1 继续,预期会卡在 review 的 interrupt
print("\n=== [3] 从修改后的 step1 继续(将在 review 处暂停) ===")
resume_cfg = {"configurable": {"thread_id": thread_id,"checkpoint_id": new_ck_id,"checkpoint_ns": new_ck_ns,}
}
graph.invoke(None, resume_cfg) # 不传人类决策,此次会在 review 的 interrupt 处停止def parse_bool(s) -> bool:"""把用户输入转换为布尔值;支持 bool / 字符串 / 数字。"""if isinstance(s, bool):return sif isinstance(s, (int, float)):return s != 0if isinstance(s, str):v = s.strip().lower()if v in {"true", "t", "yes", "y", "1", "ok", "approve", "approved"}:return Trueif v in {"false", "f", "no", "n", "0", "reject", "rejected"}:return Falseraise ValueError(f"无法识别为布尔值的输入:{s!r}")# ---------- 5) 人类介入:向 interrupt 提交答案并继续 ----------
# 示例:从命令行读取或从上层接口拿到用户输入
raw = input("Approve? (true/false/yes/no): ")
decision = parse_bool(raw)if decision:print("\n=== [4.1] 人工决策:批准=True,继续执行至 step3 ===")# 关键:将 True 作为本次 invoke 的输入;它会成为 review 节点 interrupt() 的返回值final_state = graph.invoke(Command(resume=decision), resume_cfg)print("[final]", final_state)print("[final after approval=True] ->", final_state)
else:# 也可以拒绝,输入Falseprint("\n=== [4.2] 人工决策:拒绝=False,继续执行至 step3(形成新分支) ===")# 先回到修改后的 step1(或 review 处的 checkpoint)再给 False# graph.invoke(None, resume_cfg) # 再次暂停到 reviewfinal_state2 = graph.invoke(Command(resume=decision), resume_cfg)print("[final after approval=False] ->", final_state2)# ---------- 6) 查看完整历史,能看到多个分支 ----------
print("\n=== [5] 回看历史(包含多分支,最新在最前) ===")
for i, snap in enumerate(graph.get_state_history(base_config)):ck = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck={ck}, values={snap.values}")
2.2 运行结果
=== [1] 首次完整执行(会在 review 处被 interrupt 暂停) ===
[step1] input: {}
[step1] output: {'step': 1, 'msg': 'Hello'}
[review] input: {'step': 1, 'msg': 'Hello'}
[review] interrupt payload: {'question': 'Approve current message?', 'current_msg': 'Hello', 'hint': 'Return True to approve, False to reject.'}=== 历史快照(最新在最前) ===
[0] ck=1f076b28-70cb-6ec1-8001-293561814081, values={'step': 1, 'msg': 'Hello'}
[1] ck=1f076b28-70c7-6024-8000-5a2f923450f4, values={}
[2] ck=1f076b28-70c1-6cce-bfff-1ebc9302063e, values={}=== [2] 回溯到 step1 并修改 msg='Hi'(新分支) ====== [3] 从修改后的 step1 继续(将在 review 处暂停) ===
[review] input: {'step': 1, 'msg': 'Hi'}
[review] interrupt payload: {'question': 'Approve current message?', 'current_msg': 'Hi', 'hint': 'Return True to approve, False to reject.'}
Approve? (true/false/yes/no): yes=== [4.1] 人工决策:批准=True,继续执行至 step3 ===
[review] input: {'step': 1, 'msg': 'Hi'}
[review] interrupt payload: {'question': 'Approve current message?', 'current_msg': 'Hi', 'hint': 'Return True to approve, False to reject.'}
[review] output (pending human): {'step': 2, 'msg': 'Hi', 'approved': True}
[step3] input: {'step': 2, 'msg': 'Hi', 'approved': True}
[step3] output: {'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}
[final] {'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}
[final after approval=True] -> {'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}=== [5] 回看历史(包含多分支,最新在最前) ===
[0] ck=1f076b28-947d-6eb3-8004-f22358062681, values={'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}
[1] ck=1f076b28-9445-6b33-8003-10d527b2fdb9, values={'step': 2, 'msg': 'Hi', 'approved': True}
[2] ck=1f076b28-70cf-603d-8002-0a79ba894d1e, values={'step': 1, 'msg': 'Hi'}
[3] ck=1f076b28-70cb-6ec1-8001-293561814081, values={'step': 1, 'msg': 'Hello'}
[4] ck=1f076b28-70c7-6024-8000-5a2f923450f4, values={}
[5] ck=1f076b28-70c1-6cce-bfff-1ebc9302063e, values={}
- 第一次到
review
:打印payload
后暂停; - 回溯到
step1
并把msg
改成"Hi"
:生成新分支; - 从新分支继续到
review
:再次暂停; - 控制台输入
true/false
→ 用Command(resume=...)
恢复 → 到step3
输出True
:"Hi ::: ✅ Approved"
False
:"Hi ::: ❌ Rejected"
- 历史中会出现多个
step1/2/3
:代表不同分支的快照。
3、关键参数与易混点
3.1 三件套(回溯/更新/继续时务必带全)
thread_id
:标记对话/流程 IDcheckpoint_id
:具体历史点检查checkpoint_ns
:命名空间(分支 ID)
少
checkpoint_ns
,InMemorySaver.put()
会KeyError: 'checkpoint_ns'
。
3.2 values
(对历史做“局部覆盖”)
# 把历史快照中的 msg 覆盖为 "Hi",其他键保持不变
update_state(..., values={"msg":"Hi"})
3.3 as_node
(指定“这次更新发生在哪个节点之后”)
- 用于消除“下一步节点不唯一”:告诉 LangGraph 该历史状态等价于 某节点执行完的输出,用于消除歧义(比如一个汇入节点可能有多个入边,使用as_node能够指明是从哪个节点传来的)。
- 例如:
as_node="step1"
→ 下一步明确从step1 → ...
的出边继续。
3.4 interrupt(payload)
与 Command(resume)
-
interrupt(payload)
:暂停并把payload
存进事件/检查点(默认不会自动打印)。 -
继续执行时,不要
graph.invoke(True, cfg)
,而要:graph.invoke(Command(resume=True), cfg)
4、为什么日志里有时会“双打印” review?
# 先回到修改后的 step1(或 review 处的 checkpoint)再给 False
# graph.invoke(None, resume_cfg) # 再次暂停到 review
final_state2 = graph.invoke(Command(resume=decision), resume_cfg)
print("[final after approval=False] ->", final_state2)
(现已被注释掉)
这是正常现象,原因有两个:
- 我们在分支里又手动“再跑了一次到中断点”。在
else
分支里写了这行:
graph.invoke(None, resume_cfg) # 再次暂停到 review
可是在第 [3] 步里其实已经停在 review
的 interrupt()
处了。这里再调用一次会让 review
再执行一遍,因而打印一次:
[review] input: ...
[review] interrupt payload: ...
- 恢复时会“重放”节点函数(不是从函数内的断点继续),随后调用:
final_state2 = graph.invoke(Command(resume=False), resume_cfg)
LangGraph 的恢复策略是重新执行该节点函数,在 interrupt(...)
那一行把我们传入的 resume=False
作为返回值注入进去。因此函数从头再走一遍,所以会再次打印:
[review] input: ...
[review] interrupt payload: ...
然后 interrupt(...)
这次返回 False
,继续到 step3
。
总结原因有二:
- 在恢复前又调了一次
invoke(None, resume_cfg)
去“抵达中断点”; - 恢复时节点函数会重放一次(到
interrupt(...)
把决策注入)。
修复:恢复时直接 Command(resume=...)
,不要再多跑一次 invoke(None, ...)
。
5、历史里为何一开头会有两个 {}
空值?
LangGraph 会在:
-
线程初始化时存一次空快照;
-
第一个节点之前再存一次空快照;
便于从最早期恢复。
6、常见错误与定位
症状/报错 | 根因 | 修复 |
---|---|---|
AttributeError: 'StateSnapshot' has no attribute 'checkpoint_id' | 取错地方啦 | 用 snap.config["configurable"]["checkpoint_id"] |
历史被“读空” | 历史是迭代器,被消费掉了 | history = list(graph.get_state_history(...)) |
KeyError: 'checkpoint_ns' | 更新/继续没带命名空间 | 三件套带全:thread_id/checkpoint_id/checkpoint_ns |
“下一步节点不唯一” | 无法推断历史状态来源节点 | update_state(..., as_node="该节点名") |
InvalidUpdateError: Expected dict, got True | 把布尔当 state 传给 invoke 了 | 用 Command(resume=True/False) |
review 日志打印了两次 | 恢复前多跑了一次 + 恢复时重放 | 删掉那次多余的 invoke(None, ...) |
7、读者可进一步扩展
- 持久化:生产别用内存 checkpointer;换 SQLite / Postgres,历史可查可共享。
- 幂等/去重:恢复时节点会重放,外部 I/O/副作用要做幂等。
- 统一封装:把“回溯 →(可选)修补 → 继续”的套路封成 helper,减少重复代码。
- 可观测性:统一用日志/事件订阅打印
interrupt
的 payload 与分支 lineage。 - 命名规范:给节点/分支起直观名字,便于后期维护与排障。
8、小结
- LangGraph 的 checkpoint 是节点级别的,“在节点执行完之后”打点保存,而不是在节点与节点之间的边上打点。恢复时会从该节点之后的边继续执行。
- update_state 在不改写历史的前提下修补状态并开启新分支;
- interrupt/HITL 让关键环节由人拍板,而
Command(resume)
就是它的答复通道; - 记住三件套(
thread_id/checkpoint_id/checkpoint_ns
)、as_node
与values
的职责边界,那么我们的 LangGraph 工作流就具备了“可回退、可修补、可人工兜底”的工程弹性。
上述程序是由下面这个小demo进一步演化而来,只不过是在此基础上增加了人机交互的流程。读者朋友们可通过debug下面这个小程序来深刻体会“从历史中的某一点开始改变历史轨迹”的具体细节hh
# -*- coding: utf-8 -*-
"""
LangGraph 历史回溯(time-travel)+ 篡改历史后继续执行 的完整最小示例
- 首次执行:step1 -> step2 -> step3
- 查看历史(最新在最前)
- 从 step1 的历史检查点继续(不修改)
- 篡改 step1 的历史 state(msg="Hi")并从新分支继续
- 再次查看历史,看到分支
"""from typing import Optional
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph# ---------- 1) 定义 State 和节点 ----------
class State(TypedDict):step: Optional[int]msg: Optional[str]def step1(state: State):print("[step1] in:", state)return {"step": 1, "msg": "Hello"}def step2(state: State):print("[step2] in:", state)return {"step": 2, "msg": (state.get("msg") or "") + " World"}def step3(state: State):print("[step3] in:", state)return {"step": 3, "msg": (state.get("msg") or "") + "!!!"}# ---------- 2) 构建图 ----------
builder = StateGraph(State)
builder.add_node("step1", step1)
builder.add_node("step2", step2)
builder.add_node("step3", step3)
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", "step3")
builder.add_edge("step3", END)# ---------- 3) 编译(启用 checkpoint) ----------
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)# 每条“对话/流程”的唯一标识
thread_id = "demo-thread"
base_config = {"configurable": {"thread_id": thread_id}}print("\n=== 首次完整执行 ===")
final_state = graph.invoke({}, base_config)
print("[final] ->", final_state)# ---------- 4) 查看历史(materialize 成 list,避免被消费) ----------
history = list(graph.get_state_history(base_config))
print("\n=== 历史状态(最新在最前) ===")
for i, snap in enumerate(history):ck_id = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck_id={ck_id}, values={snap.values}")# ---------- 5) 找到 step==1 的检查点 ----------
resume_from = next((s for s in history if s.values.get("step") == 1), None)
if resume_from is None:raise RuntimeError("没有找到 step==1 的历史检查点,请检查上面的打印。")resume_ck_id = resume_from.config["configurable"]["checkpoint_id"]
resume_ck_ns = resume_from.config["configurable"]["checkpoint_ns"]# ---------- 6) 从该检查点继续(不修改历史) ----------
print("\n=== 从 step1 的历史时刻继续执行(不修改) ===")
resume_config = {"configurable": {"thread_id": thread_id,"checkpoint_id": resume_ck_id,"checkpoint_ns": resume_ck_ns,}
}
continued_state = graph.invoke(None, resume_config)
print("[continued from step1] ->", continued_state)# ---------- 7) 篡改该历史时刻的 state 并继续 ----------
print("\n=== 篡改 step1 的历史状态(msg='Hi')并继续 ===")
graph.update_state({"configurable": {"thread_id": thread_id,"checkpoint_id": resume_ck_id,"checkpoint_ns": resume_ck_ns,}},values={"msg": "Hi"},# 建议:明确来源节点,避免分支歧义as_node="step1",
)# 关键:篡改会生成“新”的检查点,需重新定位到 msg=='Hi' 的 step1
history_after = list(graph.get_state_history(base_config))
new_step1 = next((s for s in history_after if s.values.get("step") == 1 and s.values.get("msg") == "Hi"),None,
)
if new_step1 is None:raise RuntimeError("没有找到被篡改后的 step1(msg == 'Hi')!")new_ck_id = new_step1.config["configurable"]["checkpoint_id"]
new_ck_ns = new_step1.config["configurable"]["checkpoint_ns"]resume_config2 = {"configurable": {"thread_id": thread_id,"checkpoint_id": new_ck_id,"checkpoint_ns": new_ck_ns,}
}
tampered_then_continued = graph.invoke(None, resume_config2)
print("[tampered from step1] ->", tampered_then_continued)# ---------- 8) 再看一次历史(可见分支) ----------
print("\n=== 回看历史(包含分支,最新在最前) ===")
for i, snap in enumerate(graph.get_state_history(base_config)):ck_id = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck_id={ck_id}, values={snap.values}")
上述程序运行结果如下:
=== 首次完整执行 ===
[step1] in: {}
[step2] in: {'step': 1, 'msg': 'Hello'}
[step3] in: {'step': 2, 'msg': 'Hello World'}
[final] -> {'step': 3, 'msg': 'Hello World!!!'}=== 历史状态(最新在最前) ===
[0] ck_id=1f076b54-32fc-6dce-8003-82c574150d29, values={'step': 3, 'msg': 'Hello World!!!'}
[1] ck_id=1f076b54-32fc-6dcd-8002-c1b0a96edd96, values={'step': 2, 'msg': 'Hello World'}
[2] ck_id=1f076b54-32f9-6626-8001-05789bc4c5d7, values={'step': 1, 'msg': 'Hello'}
[3] ck_id=1f076b54-32f5-6c76-8000-daca9d59b24b, values={}
[4] ck_id=1f076b54-32ef-6f96-bfff-d18b2b4b9133, values={}=== 从 step1 的历史时刻继续执行(不修改) ===
[step2] in: {'step': 1, 'msg': 'Hello'}
[step3] in: {'step': 2, 'msg': 'Hello World'}
[continued from step1] -> {'step': 3, 'msg': 'Hello World!!!'}=== 篡改 step1 的历史状态(msg='Hi')并继续 ===
[step2] in: {'step': 1, 'msg': 'Hi'}
[step3] in: {'step': 2, 'msg': 'Hi World'}
[tampered from step1] -> {'step': 3, 'msg': 'Hi World!!!'}=== 回看历史(包含分支,最新在最前) ===
[0] ck_id=1f076b54-330b-6d48-8004-4fc8f7df8fe1, values={'step': 3, 'msg': 'Hi World!!!'}
[1] ck_id=1f076b54-3308-6536-8003-921661f23777, values={'step': 2, 'msg': 'Hi World'}
[2] ck_id=1f076b54-3305-6cd9-8002-e29750bc79f5, values={'step': 1, 'msg': 'Hi'}
[3] ck_id=1f076b54-3302-6fd8-8003-6c2df57e5b3c, values={'step': 3, 'msg': 'Hello World!!!'}
[4] ck_id=1f076b54-3302-6fd7-8002-fe49584f39ca, values={'step': 2, 'msg': 'Hello World'}
[5] ck_id=1f076b54-32fc-6dce-8003-82c574150d29, values={'step': 3, 'msg': 'Hello World!!!'}
[6] ck_id=1f076b54-32fc-6dcd-8002-c1b0a96edd96, values={'step': 2, 'msg': 'Hello World'}
[7] ck_id=1f076b54-32f9-6626-8001-05789bc4c5d7, values={'step': 1, 'msg': 'Hello'}
[8] ck_id=1f076b54-32f5-6c76-8000-daca9d59b24b, values={}
[9] ck_id=1f076b54-32ef-6f96-bfff-d18b2b4b9133, values={}