当前位置: 首页 > ds >正文

LangGraph(八)——LangGraph运行时

目录

  • 1. 引言
  • 2. Pregel(了解
    • 2.1 简介
      • 2.1.1 Actors
      • 2.1.2 Channels
    • 2.2 Pregel API示例(这里是源代码里的示例)
      • 2.2.1 单个节点
      • 2.2.2 多个节点
      • 2.2.3 Topic Channel
      • 2.2.4 BinaryOperatorAggregate Channel
      • 2.2.5 循环
  • 3. 高级API
    • 3.1 Graph API
    • 3.2 Functional API
  • 参考

1. 引言

  Pregel实现了LangGraph的运行时,管理LangGraph应用程序的执行。编译一个StateGraph或创建entrypoint都会生成一个Pregel实例,该实例可以接受输入后被调用。

2. Pregel(了解

2.1 简介

  在LangGraph中,Pregel将actors和channels合并为一个应用程序。Actors从channels读取数据并将其写入channels。Pregel将应用程序的执行组织成多个步骤,遵循Pregel算法/批量同步并行模型。
  每个步骤包含三个阶段:
  1. 计划:确定本步骤中需要执行的actors。例如,在第一步中选择订阅特定输入channel的actors,在后续步骤中选择订阅上一步更新channels的actors。
  2. 执行:并行执行所有选定的actors,直到所有actors完成、一个actor失败或达到超时为止。在该阶段,channels更新对actors是不可见的,直到下一步。
  3. 更新:使用actors在本步骤写入的值更新channels。
  重复执行,直到没有actors被选中执行,或者达到最大步骤数。

2.1.1 Actors

  一个actor是一个PregelNode。它订阅channels,从channels中读取数据,并将数据写入到channels。可以将其视为Pregel算法中的actor。PregelNode实现了LangChain的Runnable接口。

2.1.2 Channels

  channels用于在actors之间进行通信。每个channel都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。channels可用于从一个链向另一个链发送数据,或从一个链在未来步骤中向自身发送数据。LangGraph提供了多种内置channels:
  1. LastValue:默认channel,存储发送到channel的最后一个值,适用于输入和输出值,或用于从一步向下一步发送数据。
  2. Topic:一个可配置的PubSub主题,适用于在actors之间发送多个值,或用于累积输出。可以配置为去重值或在多个步骤期间累积值。
  3. BinaryOperatorAggregate:存储一个持久值,通过将二进制运算符应用于当前值和发送到channel的每个更新来更新值,适用于在多个步骤中计算聚合。例如:total = BinaryOperatorAggregate(int, operator.add)

2.2 Pregel API示例(这里是源代码里的示例)

2.2.1 单个节点

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| Channel.write_to("b")
)app = Pregel(nodes={"node1": node1},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},input_channels=["a"],output_channels=["b"],
)print(app.invoke({"a": "foo"}))

  运行结果为:

{'b': 'foofoo'}

2.2.2 多个节点

from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| Channel.write_to("b")
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| Channel.write_to("c")
)app = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": LastValue(str),"c": EphemeralValue(str),},input_channels=["a"],output_channels=["b", "c"],
)app.invoke({"a": "foo"})

  运行结果为:

{'b': 'foofoo', 'c': 'foofoofoofoo'}

2.2.3 Topic Channel

from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| {"b": Channel.write_to("b"),"c": Channel.write_to("c")}
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| {"c": Channel.write_to("c"),}
)app = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": Topic(str, accumulate=True),},input_channels=["a"],output_channels=["c"],
)app.invoke({"a": "foo"})

  运行结果为:

{'c': ['foofoo', 'foofoofoofoo']}

2.2.4 BinaryOperatorAggregate Channel

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| {"b": Channel.write_to("b"),"c": Channel.write_to("c")}
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| {"c": Channel.write_to("c"),}
)def reducer(current, update):if current:return current + " | " + updateelse:return updateapp = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": BinaryOperatorAggregate(str, operator=reducer),},input_channels=["a"],output_channels=["c"]
)app.invoke({"a": "foo"})

  运行结果为

{'c': 'foofoo | foofoofoofoo'}

2.2.5 循环

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntryexample_node = (Channel.subscribe_to("value")| (lambda x: x + x if len(x) < 10 else None)| ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)])
)app = Pregel(nodes={"example_node": example_node},channels={"value": EphemeralValue(str),},input_channels=["value"],output_channels=["value"]
)app.invoke({"value": "a"})

  运行结果为:

{'value': 'aaaaaaaaaaaaaaaa'}

3. 高级API

3.1 Graph API

  StateGraph是一种高级抽象,简化了Pregel应用程序的创建。它允许你定义一个由节点和边组成的图。当你编译图时,StateGraph API会自动为你创建Pregel应用程序。

from typing import TypedDict, Optionalfrom langgraph.constants import START
from langgraph.graph import StateGraphclass Essay(TypedDict):topic: strcontent: Optional[str]score: Optional[float]def write_essay(essay: Essay):return {"content": f"Essay about {essay['topic']}",}def score_essay(essay: Essay):return {"score": 10}builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")graph = builder.compile()
print("Nodes:")
print(graph.nodes)
print("Edges:")
print(graph.channels)

  运行结果:

Nodes:
{'__start__': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA000>, 'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA1E0>, 'score_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA360>}
Edges:
{'topic': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E0D40>, 'content': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3EC0>, 'score': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3B80>, '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE4D0D40>, 'branch:to:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50ECC0>, 'branch:to:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50C7C0>}

3.2 Functional API

  在Functional API中,你可以使用entrypoint来创建一个Pregel应用程序。entrypoint装饰器允许你定义一个接受输入并返回输出的函数。

from typing import TypedDict, Optionalfrom langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypointclass Essay(TypedDict):topic: strcontent: Optional[str]score: Optional[float]checkpointer = InMemorySaver()@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):return {"content": f"Essay about {essay['topic']}",}print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)

  运行结果为:

Nodes: 
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA5A0>}
Channels: 
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE532140>, '__end__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE310680>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE5B31C0>}

参考

https://langchain-ai.github.io/langgraph/concepts/low_level/

http://www.xdnf.cn/news/10627.html

相关文章:

  • K3s简介、实战、问题记录
  • STM32F407寄存器操作(ADC非连续扫描模式)
  • 操作系统学习(九)——存储系统
  • AI 代理框架:使用正确的工具构建更智能的系统
  • 2025.6.1总结
  • 仓颉鸿蒙开发:制作底部标签栏
  • python训练营打卡第41天
  • 启动你的RocketMQ之旅(七)-Store存储原理
  • MySQL优化全链路实践:从慢查询治理到架构升级
  • 邮件验证码存储推荐方式
  • 前端基础学习html+css+js
  • 计算机网络第1章(上):网络组成与三种交换方式全解析
  • 【IC】多角多模式信号完整性优化
  • VBA数据库解决方案二十:Select表达式From区域Where条件Order by
  • 基于React + TypeScript构建高度可定制的QR码生成器
  • 鸿蒙OSUniApp结合机器学习打造智能图像分类应用:HarmonyOS实践指南#三方框架 #Uniapp
  • MCU SoC
  • Shape and boundary-aware
  • Ubuntu配置中文语言
  • GoldenEye
  • 机器学习-ROC曲线​​ 和 ​​AUC指标
  • 内存管理 : 06 内存换出
  • 不使用绑定的方法
  • 剑指offer hot100 第三周
  • 邂逅Webpack和打包过程
  • 基于python大数据的音乐可视化与推荐系统
  • hadoop完整安装教程(附带jdk1.8+vim+ssh安装)
  • [AI算法] LLM中的gradient checkpoint机制
  • Office办公文档软件安装包2024版
  • Ubuntu终端性能监视工具