当前位置: 首页 > backend >正文

LangGraph 历史追溯 人机协同(Human-in-the-loop,HITL)

本文用 get_state_historyupdate_stateinterrupt 把工作流做成“可回退、可修补、可人工兜底”,通过完整可运行示例将常见坑位一网打尽:历史追溯(get_state_history)→ 从历史点继续(time-travel)→ 分叉管理(checkpoint / namespace)→ 历史篡改(update_state + values + as_node)→ 人机(interrupt + Command(resume))→ 调试与错误修复

1、三个核心概念

  • State(全局状态):节点间累计合并的字典。每个节点返回的片段会 merge 进全局 state。
  • Checkpoint(检查点):每次写状态后持久化的“快照”;通过它实现历史追溯/恢复。在 LangGraph 里,触发检查点的时机是在节点执行完成、返回更新值之后。此时,LangGraph 会把当时的 state(节点执行完后的值) 存入 checkpointer(这里是 InMemorySaver)。也就是说,检查点记录的状态,代表了“某个节点执行完毕后,图在这个位置的完整上下文”。
  • 历史与分叉
    • get_state_history(config) 返回的历史快照按最新优先排序,并以迭代器形式提供。由于迭代器在遍历一次后将无法再次迭代,建议先使用 list() 将其内容完整保存,再进行后续处理。
    • 从过去的 checkpoint 继续update_state 都会生成新分支;旧时间线不被改写。
    • 恢复/更新时传三件套最稳:thread_id + checkpoint_id + checkpoint_ns

2、最小可运行示例(历史追溯 + HITL)

2.1 图结构与节点

  • step1:写入 {"step":1,"msg":"Hello"}
  • reviewinterrupt 暂停等待人工决策
  • step3:根据用户给出的 approved 给出最终结果
# -*- coding: utf-8 -*-
"""
LangGraph: time-travel + interrupt(HITL) 示例
- 节点: step1 -> review(人为确认, interrupt) -> step3
- 首次执行生成历史
- 回溯到 step1,可选修改 msg
- 从回溯点继续:先命中 review 的 interrupt 暂停
- 人工决策:再次 invoke 传入 True/False 继续
"""
from typing import Optional
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command  # 关键# ---------- 1) 定义 State 和节点 ----------
class State(TypedDict):step: Optional[int]msg: Optional[str]approved: Optional[bool]def step1(state: State):print("[step1] input:", state)new_state = {"step": 1, "msg": "Hello"}print("[step1] output:", {**state, **new_state})return new_statedef review(state: State):"""Human-in-Loop:让人确认当前 msg,返回的布尔值作为审批结果。"""print("[review] input:", state)payload = {"question": "Approve current message?","current_msg": state.get("msg"),"hint": "Return True to approve, False to reject."}print("[review] interrupt payload:", payload)  # ← 手动打印decision = interrupt(payload)# 这里的 decision 就是断点恢复时传给 graph.invoke(...) 的值new_state = {"approved": bool(decision), "step": 2}print("[review] output (pending human):", {**state, **new_state})return new_statedef step3(state: State):print("[step3] input:", state)suffix = "✅ Approved" if state.get("approved") else "❌ Rejected"new_state = {"step": 3, "msg": f"{state.get('msg') or ''} ::: {suffix}"}print("[step3] output:", {**state, **new_state})return new_state# ---------- 2) 构建图 ----------
builder = StateGraph(State)
builder.add_node("step1", step1)
builder.add_node("review", review)
builder.add_node("step3", step3)
builder.add_edge(START, "step1")
builder.add_edge("step1", "review")
builder.add_edge("review", "step3")
builder.add_edge("step3", END)# ---------- 3) 编译(启用 checkpoint) ----------
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)thread_id = "human-in-loop"
base_config = {"configurable": {"thread_id": thread_id}}print("\n=== [1] 首次完整执行(会在 review 处被 interrupt 暂停) ===")
try:# 第一次跑:会停在 review 的 interrupt 处并结束本次调用(不报错)graph.invoke({}, base_config)
except Exception as e:# 绝大多数情况下不会抛异常;这里只是保险打印print("First run exception:", repr(e))# 看历史(最新在最前)
history = list(graph.get_state_history(base_config))
print("\n=== 历史快照(最新在最前) ===")
for i, snap in enumerate(history):ck = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck={ck}, values={snap.values}")# ---------- 4) 回溯到 step1(可选:修改),然后继续到 review(再次 interrupt) ----------
# 找到一个 step==1 的检查点
resume_from = next((s for s in history if s.values.get("step") == 1), None)
assert resume_from, "没有找到 step==1 的检查点"resume_ck_id = resume_from.config["configurable"]["checkpoint_id"]
resume_ck_ns = resume_from.config["configurable"]["checkpoint_ns"]# (可选)修改回溯点的 msg
print("\n=== [2] 回溯到 step1 并修改 msg='Hi'(新分支) ===")
graph.update_state({"configurable": {"thread_id": thread_id,"checkpoint_id": resume_ck_id,"checkpoint_ns": resume_ck_ns,}},values={"msg": "Hi"},as_node="step1",  # 明确来源节点,避免下一步歧义
)# 重新定位到“被修改后的 step1”
history2 = list(graph.get_state_history(base_config))
new_step1 = next((s for s in history2 if s.values.get("step") == 1 and s.values.get("msg") == "Hi"),None
)
assert new_step1, "没有找到被修改后的 step1"
new_ck_id = new_step1.config["configurable"]["checkpoint_id"]
new_ck_ns = new_step1.config["configurable"]["checkpoint_ns"]# 从新 step1 继续,预期会卡在 review 的 interrupt
print("\n=== [3] 从修改后的 step1 继续(将在 review 处暂停) ===")
resume_cfg = {"configurable": {"thread_id": thread_id,"checkpoint_id": new_ck_id,"checkpoint_ns": new_ck_ns,}
}
graph.invoke(None, resume_cfg)  # 不传人类决策,此次会在 review 的 interrupt 处停止def parse_bool(s) -> bool:"""把用户输入转换为布尔值;支持 bool / 字符串 / 数字。"""if isinstance(s, bool):return sif isinstance(s, (int, float)):return s != 0if isinstance(s, str):v = s.strip().lower()if v in {"true", "t", "yes", "y", "1", "ok", "approve", "approved"}:return Trueif v in {"false", "f", "no", "n", "0", "reject", "rejected"}:return Falseraise ValueError(f"无法识别为布尔值的输入:{s!r}")# ---------- 5) 人类介入:向 interrupt 提交答案并继续 ----------
# 示例:从命令行读取或从上层接口拿到用户输入
raw = input("Approve? (true/false/yes/no): ")
decision = parse_bool(raw)if decision:print("\n=== [4.1] 人工决策:批准=True,继续执行至 step3 ===")# 关键:将 True 作为本次 invoke 的输入;它会成为 review 节点 interrupt() 的返回值final_state = graph.invoke(Command(resume=decision), resume_cfg)print("[final]", final_state)print("[final after approval=True] ->", final_state)
else:# 也可以拒绝,输入Falseprint("\n=== [4.2] 人工决策:拒绝=False,继续执行至 step3(形成新分支) ===")# 先回到修改后的 step1(或 review 处的 checkpoint)再给 False# graph.invoke(None, resume_cfg)  # 再次暂停到 reviewfinal_state2 = graph.invoke(Command(resume=decision), resume_cfg)print("[final after approval=False] ->", final_state2)# ---------- 6) 查看完整历史,能看到多个分支 ----------
print("\n=== [5] 回看历史(包含多分支,最新在最前) ===")
for i, snap in enumerate(graph.get_state_history(base_config)):ck = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck={ck}, values={snap.values}")

2.2 运行结果

=== [1] 首次完整执行(会在 review 处被 interrupt 暂停) ===
[step1] input: {}
[step1] output: {'step': 1, 'msg': 'Hello'}
[review] input: {'step': 1, 'msg': 'Hello'}
[review] interrupt payload: {'question': 'Approve current message?', 'current_msg': 'Hello', 'hint': 'Return True to approve, False to reject.'}=== 历史快照(最新在最前) ===
[0] ck=1f076b28-70cb-6ec1-8001-293561814081, values={'step': 1, 'msg': 'Hello'}
[1] ck=1f076b28-70c7-6024-8000-5a2f923450f4, values={}
[2] ck=1f076b28-70c1-6cce-bfff-1ebc9302063e, values={}=== [2] 回溯到 step1 并修改 msg='Hi'(新分支) ====== [3] 从修改后的 step1 继续(将在 review 处暂停) ===
[review] input: {'step': 1, 'msg': 'Hi'}
[review] interrupt payload: {'question': 'Approve current message?', 'current_msg': 'Hi', 'hint': 'Return True to approve, False to reject.'}
Approve? (true/false/yes/no): yes=== [4.1] 人工决策:批准=True,继续执行至 step3 ===
[review] input: {'step': 1, 'msg': 'Hi'}
[review] interrupt payload: {'question': 'Approve current message?', 'current_msg': 'Hi', 'hint': 'Return True to approve, False to reject.'}
[review] output (pending human): {'step': 2, 'msg': 'Hi', 'approved': True}
[step3] input: {'step': 2, 'msg': 'Hi', 'approved': True}
[step3] output: {'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}
[final] {'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}
[final after approval=True] -> {'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}=== [5] 回看历史(包含多分支,最新在最前) ===
[0] ck=1f076b28-947d-6eb3-8004-f22358062681, values={'step': 3, 'msg': 'Hi ::: ✅ Approved', 'approved': True}
[1] ck=1f076b28-9445-6b33-8003-10d527b2fdb9, values={'step': 2, 'msg': 'Hi', 'approved': True}
[2] ck=1f076b28-70cf-603d-8002-0a79ba894d1e, values={'step': 1, 'msg': 'Hi'}
[3] ck=1f076b28-70cb-6ec1-8001-293561814081, values={'step': 1, 'msg': 'Hello'}
[4] ck=1f076b28-70c7-6024-8000-5a2f923450f4, values={}
[5] ck=1f076b28-70c1-6cce-bfff-1ebc9302063e, values={}
  • 第一次到 review:打印 payload暂停
  • 回溯到 step1 并把 msg 改成 "Hi"生成新分支
  • 从新分支继续到 review:再次暂停
  • 控制台输入 true/false → 用 Command(resume=...) 恢复 → 到 step3 输出
    • True"Hi ::: ✅ Approved"
    • False"Hi ::: ❌ Rejected"
  • 历史中会出现多个 step1/2/3:代表不同分支的快照。

3、关键参数与易混点

3.1 三件套(回溯/更新/继续时务必带全)

  • thread_id:标记对话/流程 ID
  • checkpoint_id:具体历史点检查
  • checkpoint_ns:命名空间(分支 ID)

checkpoint_nsInMemorySaver.put()KeyError: 'checkpoint_ns'

3.2 values(对历史做“局部覆盖”)

# 把历史快照中的 msg 覆盖为 "Hi",其他键保持不变
update_state(..., values={"msg":"Hi"})

3.3 as_node(指定“这次更新发生在哪个节点之后”)

  • 用于消除“下一步节点不唯一”:告诉 LangGraph 该历史状态等价于 某节点执行完的输出,用于消除歧义(比如一个汇入节点可能有多个入边,使用as_node能够指明是从哪个节点传来的)。
  • 例如:as_node="step1" → 下一步明确从 step1 → ... 的出边继续。

3.4 interrupt(payload)Command(resume)

  • interrupt(payload)暂停并把 payload 存进事件/检查点(默认不会自动打印)。

  • 继续执行时,不要 graph.invoke(True, cfg),而要:

    graph.invoke(Command(resume=True), cfg)
    

4、为什么日志里有时会“双打印” review?

# 先回到修改后的 step1(或 review 处的 checkpoint)再给 False
# graph.invoke(None, resume_cfg)  # 再次暂停到 review
final_state2 = graph.invoke(Command(resume=decision), resume_cfg)
print("[final after approval=False] ->", final_state2)

(现已被注释掉)

这是正常现象,原因有两个:

  1. 我们在分支里又手动“再跑了一次到中断点”。在 else 分支里写了这行:
graph.invoke(None, resume_cfg)  # 再次暂停到 review

可是在第 [3] 步里其实已经停在 reviewinterrupt() 处了。这里再调用一次会让 review 再执行一遍,因而打印一次:

[review] input: ...
[review] interrupt payload: ...
  1. 恢复时会“重放”节点函数(不是从函数内的断点继续),随后调用:
final_state2 = graph.invoke(Command(resume=False), resume_cfg)

LangGraph 的恢复策略是重新执行该节点函数,在 interrupt(...) 那一行把我们传入的 resume=False 作为返回值注入进去。因此函数从头再走一遍,所以会再次打印:

[review] input: ...
[review] interrupt payload: ...

然后 interrupt(...) 这次返回 False,继续到 step3

总结原因有二:

  1. 在恢复前又调了一次 invoke(None, resume_cfg) 去“抵达中断点”;
  2. 恢复时节点函数会重放一次(到 interrupt(...) 把决策注入)。

修复:恢复时直接 Command(resume=...),不要再多跑一次 invoke(None, ...)

5、历史里为何一开头会有两个 {} 空值?

LangGraph 会在:

  • 线程初始化时存一次空快照;

  • 第一个节点之前再存一次空快照;

    便于从最早期恢复。

6、常见错误与定位

症状/报错根因修复
AttributeError: 'StateSnapshot' has no attribute 'checkpoint_id'取错地方啦snap.config["configurable"]["checkpoint_id"]
历史被“读空”历史是迭代器,被消费掉了history = list(graph.get_state_history(...))
KeyError: 'checkpoint_ns'更新/继续没带命名空间三件套带全:thread_id/checkpoint_id/checkpoint_ns
“下一步节点不唯一”无法推断历史状态来源节点update_state(..., as_node="该节点名")
InvalidUpdateError: Expected dict, got True把布尔当 state 传给 invokeCommand(resume=True/False)
review 日志打印了两次恢复前多跑了一次 + 恢复时重放删掉那次多余的 invoke(None, ...)

7、读者可进一步扩展

  • 持久化:生产别用内存 checkpointer;换 SQLite / Postgres,历史可查可共享。
  • 幂等/去重:恢复时节点会重放,外部 I/O/副作用要做幂等。
  • 统一封装:把“回溯 →(可选)修补 → 继续”的套路封成 helper,减少重复代码。
  • 可观测性:统一用日志/事件订阅打印 interrupt 的 payload 与分支 lineage。
  • 命名规范:给节点/分支起直观名字,便于后期维护与排障。

8、小结

  • LangGraph 的 checkpoint 是节点级别的,“在节点执行完之后”打点保存,而不是在节点与节点之间的边上打点。恢复时会从该节点之后的边继续执行。
  • update_state 在不改写历史的前提下修补状态并开启新分支
  • interrupt/HITL 让关键环节由人拍板,而 Command(resume) 就是它的答复通道;
  • 记住三件套(thread_id/checkpoint_id/checkpoint_ns)、as_nodevalues 的职责边界,那么我们的 LangGraph 工作流就具备了“可回退、可修补、可人工兜底”的工程弹性。

上述程序是由下面这个小demo进一步演化而来,只不过是在此基础上增加了人机交互的流程。读者朋友们可通过debug下面这个小程序来深刻体会“从历史中的某一点开始改变历史轨迹”的具体细节hh

# -*- coding: utf-8 -*-
"""
LangGraph 历史回溯(time-travel)+ 篡改历史后继续执行 的完整最小示例
- 首次执行:step1 -> step2 -> step3
- 查看历史(最新在最前)
- 从 step1 的历史检查点继续(不修改)
- 篡改 step1 的历史 state(msg="Hi")并从新分支继续
- 再次查看历史,看到分支
"""from typing import Optional
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph# ---------- 1) 定义 State 和节点 ----------
class State(TypedDict):step: Optional[int]msg: Optional[str]def step1(state: State):print("[step1] in:", state)return {"step": 1, "msg": "Hello"}def step2(state: State):print("[step2] in:", state)return {"step": 2, "msg": (state.get("msg") or "") + " World"}def step3(state: State):print("[step3] in:", state)return {"step": 3, "msg": (state.get("msg") or "") + "!!!"}# ---------- 2) 构建图 ----------
builder = StateGraph(State)
builder.add_node("step1", step1)
builder.add_node("step2", step2)
builder.add_node("step3", step3)
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", "step3")
builder.add_edge("step3", END)# ---------- 3) 编译(启用 checkpoint) ----------
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)# 每条“对话/流程”的唯一标识
thread_id = "demo-thread"
base_config = {"configurable": {"thread_id": thread_id}}print("\n=== 首次完整执行 ===")
final_state = graph.invoke({}, base_config)
print("[final] ->", final_state)# ---------- 4) 查看历史(materialize 成 list,避免被消费) ----------
history = list(graph.get_state_history(base_config))
print("\n=== 历史状态(最新在最前) ===")
for i, snap in enumerate(history):ck_id = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck_id={ck_id}, values={snap.values}")# ---------- 5) 找到 step==1 的检查点 ----------
resume_from = next((s for s in history if s.values.get("step") == 1), None)
if resume_from is None:raise RuntimeError("没有找到 step==1 的历史检查点,请检查上面的打印。")resume_ck_id = resume_from.config["configurable"]["checkpoint_id"]
resume_ck_ns = resume_from.config["configurable"]["checkpoint_ns"]# ---------- 6) 从该检查点继续(不修改历史) ----------
print("\n=== 从 step1 的历史时刻继续执行(不修改) ===")
resume_config = {"configurable": {"thread_id": thread_id,"checkpoint_id": resume_ck_id,"checkpoint_ns": resume_ck_ns,}
}
continued_state = graph.invoke(None, resume_config)
print("[continued from step1] ->", continued_state)# ---------- 7) 篡改该历史时刻的 state 并继续 ----------
print("\n=== 篡改 step1 的历史状态(msg='Hi')并继续 ===")
graph.update_state({"configurable": {"thread_id": thread_id,"checkpoint_id": resume_ck_id,"checkpoint_ns": resume_ck_ns,}},values={"msg": "Hi"},# 建议:明确来源节点,避免分支歧义as_node="step1",
)# 关键:篡改会生成“新”的检查点,需重新定位到 msg=='Hi' 的 step1
history_after = list(graph.get_state_history(base_config))
new_step1 = next((s for s in history_after if s.values.get("step") == 1 and s.values.get("msg") == "Hi"),None,
)
if new_step1 is None:raise RuntimeError("没有找到被篡改后的 step1(msg == 'Hi')!")new_ck_id = new_step1.config["configurable"]["checkpoint_id"]
new_ck_ns = new_step1.config["configurable"]["checkpoint_ns"]resume_config2 = {"configurable": {"thread_id": thread_id,"checkpoint_id": new_ck_id,"checkpoint_ns": new_ck_ns,}
}
tampered_then_continued = graph.invoke(None, resume_config2)
print("[tampered from step1] ->", tampered_then_continued)# ---------- 8) 再看一次历史(可见分支) ----------
print("\n=== 回看历史(包含分支,最新在最前) ===")
for i, snap in enumerate(graph.get_state_history(base_config)):ck_id = snap.config["configurable"].get("checkpoint_id")print(f"[{i}] ck_id={ck_id}, values={snap.values}")

上述程序运行结果如下:

=== 首次完整执行 ===
[step1] in: {}
[step2] in: {'step': 1, 'msg': 'Hello'}
[step3] in: {'step': 2, 'msg': 'Hello World'}
[final] -> {'step': 3, 'msg': 'Hello World!!!'}=== 历史状态(最新在最前) ===
[0] ck_id=1f076b54-32fc-6dce-8003-82c574150d29, values={'step': 3, 'msg': 'Hello World!!!'}
[1] ck_id=1f076b54-32fc-6dcd-8002-c1b0a96edd96, values={'step': 2, 'msg': 'Hello World'}
[2] ck_id=1f076b54-32f9-6626-8001-05789bc4c5d7, values={'step': 1, 'msg': 'Hello'}
[3] ck_id=1f076b54-32f5-6c76-8000-daca9d59b24b, values={}
[4] ck_id=1f076b54-32ef-6f96-bfff-d18b2b4b9133, values={}=== 从 step1 的历史时刻继续执行(不修改) ===
[step2] in: {'step': 1, 'msg': 'Hello'}
[step3] in: {'step': 2, 'msg': 'Hello World'}
[continued from step1] -> {'step': 3, 'msg': 'Hello World!!!'}=== 篡改 step1 的历史状态(msg='Hi')并继续 ===
[step2] in: {'step': 1, 'msg': 'Hi'}
[step3] in: {'step': 2, 'msg': 'Hi World'}
[tampered from step1] -> {'step': 3, 'msg': 'Hi World!!!'}=== 回看历史(包含分支,最新在最前) ===
[0] ck_id=1f076b54-330b-6d48-8004-4fc8f7df8fe1, values={'step': 3, 'msg': 'Hi World!!!'}
[1] ck_id=1f076b54-3308-6536-8003-921661f23777, values={'step': 2, 'msg': 'Hi World'}
[2] ck_id=1f076b54-3305-6cd9-8002-e29750bc79f5, values={'step': 1, 'msg': 'Hi'}
[3] ck_id=1f076b54-3302-6fd8-8003-6c2df57e5b3c, values={'step': 3, 'msg': 'Hello World!!!'}
[4] ck_id=1f076b54-3302-6fd7-8002-fe49584f39ca, values={'step': 2, 'msg': 'Hello World'}
[5] ck_id=1f076b54-32fc-6dce-8003-82c574150d29, values={'step': 3, 'msg': 'Hello World!!!'}
[6] ck_id=1f076b54-32fc-6dcd-8002-c1b0a96edd96, values={'step': 2, 'msg': 'Hello World'}
[7] ck_id=1f076b54-32f9-6626-8001-05789bc4c5d7, values={'step': 1, 'msg': 'Hello'}
[8] ck_id=1f076b54-32f5-6c76-8000-daca9d59b24b, values={}
[9] ck_id=1f076b54-32ef-6f96-bfff-d18b2b4b9133, values={}
http://www.xdnf.cn/news/17537.html

相关文章:

  • 通用 maven 私服 settings.xml 多源配置文件(多个仓库优先级配置)
  • OpenCV计算机视觉实战(19)——特征描述符详解
  • Python自动化测试实战:reCAPTCHA V3绕过技术深度解析
  • 关于JavaScript 性能优化的实战指南
  • 4-下一代防火墙组网方案
  • 需求列表如何做层级结构
  • Redis类型之Hash
  • vscode的wsl环境,怎么打开linux盘的工程?
  • 【Oracle】如何使用DBCA工具删除数据库?
  • 九,算法-递归
  • ​电风扇离线语音芯片方案设计与应用场景:基于 8 脚 MCU 与 WTK6900P 的创新融合
  • Spark 优化全攻略:从 “卡成 PPT“ 到 “飞一般体验“
  • Empire--安装、使用
  • 布控球:临时布防场景的高清回传利器-伟博
  • 人工智能-python-机器学习-逻辑回归与K-Means算法:理论与应用
  • PYTHON开发的实现运营数据大屏
  • OFD一键转PDF格式,支持批量转换!
  • pip 和 conda,到底用哪个安装?
  • golang开源库之LaPluma
  • 是否有必要使用 Oracle 向量数据库?
  • Oracle 19C 配置TAF
  • CLIP,BLIP,SigLIP技术详解
  • 分治-归并-912.排序数组-力扣(LeetCode)
  • 机器学习——K-means聚类
  • IPCP(IP Control Protocol,IP控制协议)
  • Apache Ignite 生产级的线程池关闭工具方法揭秘
  • 【运维进阶】LAMPLNMP 最佳实践
  • 疯狂星期四文案网第36天运营日记
  • WNZ-20转速扭矩试验台
  • PHP request文件封装