当前位置: 首页 > java >正文

LangGraph 及多agent

什么是langgraph ?

是一个由于构建有状态的多参与者应用程序的库,理由LLM构建代理和多代理流程。其核心优势:循环性、可控性和持久性。LangGraph 允许自定义循环流程。

langGraph 的主要功能

循环和分支: 在应用中实现循环和条件语句
持久性:在图中的每个步骤之后自动保持状态。在任何时候暂停和恢复图以支持错误。
人机交互:在中断图执行已批准或编辑代理的计划的下一个动作。
流支持:在每个节点产生输出时流式传输输出(包换令牌流式传输)

LangGraph 的组成部分

LangGraph SDK (API客户端)、LangGraph Cli、LangGraph Studio (用户界面/调试器)。

LangGraph 的核心是将代理工作流建模为图。你可以使用是三个关键的组件来定义代理行为
1:状态:一个共享的数据结构表示程序应用的当前快照,他可以是python的任何类型但一般是dict
2:节点: 代码逻辑的python 函数。接收当前状态作为输入执行一些计算,并返回一个更新的状态(图的状态inactive、active)。
3:遍 python函数,根据当前状态确定要执行的下一个节点。

简单代码示例

from typing import Literal, Annotated
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
#from langchain_core.pydantic_v1 import BaseModel, Field
from pydantic.v1 import BaseModel, Field
import re# ================== 状态类定义 ==================
class AgentState(BaseModel):messages: Annotated[list, Field(default_factory=list)]next_agent: Literal["researcher", "calculator", "writer", "done"] = "researcher"step_count: int = 0def __getitem__(self, key):return getattr(self, key)def get(self, key, default=None):return getattr(self, key, default)# ================== 消息转换函数 ==================
def convert_messages(raw_messages):return [msg if not isinstance(msg, dict) else {"user": HumanMessage,"system": SystemMessage,"assistant": AIMessage}[msg["role"]](content=msg["content"])for msg in raw_messages]# ================== 模型配置 ==================
model = ChatOpenAI(openai_api_base="http://localhost:23333/v1",model_name="qwen_chat",openai_api_key="EMPTY",max_tokens=150,temperature=0.2
)# ================== 核心函数:create_agent ==================
def create_agent(role: str, prompt: str):"""通用Agent创建函数"""def agent(state: AgentState):current_state = AgentState(**state) if isinstance(state, dict) else statemessages = convert_messages(current_state.messages)messages.append(SystemMessage(content=f"{prompt}\n请用'建议下一步:选项'格式结尾(选项只能是researcher/calculator/writer/done)"))try:response = model.invoke(messages)print(f"[{role}输出] {response.content}")# 解析下一步建议next_agent = "done"if match := re.search(r'建议下一步:\s*(\w+)', response.content, re.IGNORECASE):suggestion = match.group(1).lower()if suggestion in {"researcher", "calculator", "writer", "done"}:next_agent = suggestionreturn AgentState(**{"messages": messages + [response],"next_agent": next_agent,"step_count": current_state.step_count + 1})except Exception as e:print(f"[{role}错误] {str(e)}")return AgentState(**{"messages": current_state.messages,"next_agent": "done","step_count": current_state.step_count + 1})return agent# ================== Supervisor节点 ==================
def supervisor(state: AgentState):current_state = AgentState(**state) if isinstance(state, dict) else stateprint(f"\n[Supervisor] 当前步数: {current_state.step_count}")if current_state.step_count >= 10:return AgentState(**{"messages": current_state.messages,"next_agent": "done","step_count": current_state.step_count + 1})messages = convert_messages(current_state.messages)control_prompt = '''请严格选择并直接回复以下选项(仅单词):
researcher - 需要进一步研究
calculator - 需要数学计算
writer - 生成最终报告
done - 结束流程'''messages.append(SystemMessage(content=control_prompt))try:response = model.invoke(messages)raw_output = response.content.strip().lower()print(f"[Supervisor决策] 原始输出: {raw_output}")# 增强解析逻辑if "writer" in raw_output:next_agent = "writer"elif "calculator" in raw_output:next_agent = "calculator"elif "researcher" in raw_output:next_agent = "researcher"else:next_agent = "done"return AgentState(**{"messages": current_state.messages,"next_agent": next_agent,"step_count": current_state.step_count + 1})except Exception as e:print(f"[Supervisor错误] {str(e)}")return AgentState(**{"messages": current_state.messages,"next_agent": "done","step_count": current_state.step_count + 1})# ================== 初始化各Agent ==================
researcher_agent = create_agent("研究员", "分析GDP趋势")
calculator_agent = create_agent("计算器", "执行增长率计算")
writer_agent = create_agent("作家", "生成最终报告(完成后必须添加[END]标识)")# ================== 工作流配置 ==================
builder = StateGraph(AgentState)builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher_agent)
builder.add_node("calculator", calculator_agent)
builder.add_node("writer", writer_agent)builder.add_conditional_edges("supervisor",lambda state: state.next_agent,{"researcher": "researcher","calculator": "calculator","writer": "writer","done": END}
)for agent in ["researcher", "calculator", "writer"]:builder.add_edge(agent, "supervisor")builder.set_entry_point("supervisor")
workflow = builder.compile()# ================== 执行测试 ==================
if __name__ == "__main__":test_input = AgentState(messages=[{"role": "user", "content": "成都市GDP增长率分析"}])try:for step in workflow.stream(test_input):node_name = list(step.keys())[0]state = step[node_name]print(f"\n[系统状态] 当前节点: {node_name}")print(f"[状态] next_agent: {state.get('next_agent')}")print(f"[状态] step_count: {state.get('step_count')}")if node_name == "__end__":print("\n====== 流程正常终止 ======")if state.messages:print("最终报告:")print(state.messages[-1].content)breakexcept Exception as e:print(f"\n!!! 流程异常终止: {str(e)}")
http://www.xdnf.cn/news/8609.html

相关文章:

  • SpringBoot的pom.xml文件中设置多环境配置信息
  • 黑马k8s(十四)
  • 性能测试工具JMeter
  • 机器学习第二十七讲:Kaggle → 参加机器学习界的奥林匹克
  • 大数据治理:理论、实践与未来展望(一)
  • Next.js项目创建(chapter 1)
  • 关于vector、queue、list哪边是front、哪边是back,增加、删除元素操作
  • 黑马Java基础笔记-15
  • C++ 实现二叉树的后序遍历与中序遍历构建及层次遍历输出
  • 吃透C++ for循环:框架+例题
  • 理解 Redis 事务-20 (MULTI、EXEC、DISCARD)
  • c/c++的opencv像素级操作二值化
  • 开发者工具箱-鸿蒙IPv6子网计算器开发笔记
  • .NET外挂系列:8. harmony 的IL编织 Transpiler
  • 如何通过EventChannel实现Flutter与原生平台的双向通信?
  • C++ 输入输出流示例代码剖析
  • 每日c/c++题 备战蓝桥杯(洛谷P1873 EKO砍树问题详解)
  • 几个MySQL系统调优工具
  • 黑马点评--基于Redis实现共享session登录
  • 《关于浔川社团退出DevPress社区及内容撤回的声明》
  • [C++面试] 基础题 11~20
  • 怎样改变中断优先级?
  • 酷柚易汛ERP仓储物流解决方案
  • CodeBuddy实现pdf批量加密
  • SQL注入基础
  • vue+threeJs 创造镂空管状
  • 配置tomcat时,无法部署工件该怎么办?
  • 深度学习损失“三位一体”——从 Fisher 的最大似然到 Shannon 的交叉熵再到 KL 散度,并走进 PET·P-Tuning微调·知识蒸馏的实战
  • Selenium自动化测试网页加载太慢如何解决?
  • 基于netty实现视频流式传输和多线程传输