当前位置: 首页 > web >正文

SpringAI Alibaba Graph 流式对话

Spring AI Graph 项目技术博客

项目概述

本次Demo演示的是一个基于 Spring AI 和 Alibaba Cloud AI Graph 的智能对话系统,展示了如何使用图计算的方式构建 AI 应用流程。项目采用响应式编程和流式处理,实现了高效的 AI 对话服务。

核心架构

1. 图计算架构

使用 StateGraph 来定义 AI 处理流程,通过节点(Node)和边(Edge)的方式组织业务逻辑:

START → query → result → END
  • query 节点: 负责调用大语言模型处理用户查询
  • result 节点: 处理最终结果并输出

2. 流式处理

采用 Server-Sent Events (SSE) 技术实现实时流式响应,用户可以实时看到 AI 的回复过程。

3. 异步处理

使用 AsyncGeneratorFlux 实现异步非阻塞的数据处理,提高系统性能。

核心组件详解

DemoBoxOneGraph - 图定义配置

/*** DemoBoxOneGraph - AI对话图计算配置类* * 该类负责配置和构建一个基于Spring AI的智能对话流程图。* 使用Alibaba Cloud AI Graph框架实现节点编排和状态管理。* * 流程图结构:* START → query(查询节点) → result(结果节点) → END* * @author xinggui* @version 1.0* @since 2024*/
@Configuration
@Slf4j
public class DemoBoxOneGraph {/*** 注入OpenAI聊天模型,用于AI对话生成*/@Resourceprivate OpenAiChatModel openAiChatModel;/*** 创建并配置AI对话状态图* * 该方法构建一个完整的对话流程图,包含:* 1. 查询节点:调用大语言模型处理用户输入* 2. 结果节点:处理和输出最终结果* 3. 状态管理:使用ReplaceStrategy策略管理状态更新* 4. 流程可视化:生成Mermaid格式的流程图* * @param openAiChatModel OpenAI聊天模型实例* @return 配置完成的状态图* @throws GraphStateException 当图状态配置出现错误时抛出*/@Bean("demoBoxOnesGraph")public StateGraph stateGraph(OpenAiChatModel openAiChatModel) throws GraphStateException {// 构建聊天客户端,添加日志记录器ChatClient chatClient = ChatClient.builder(openAiChatModel).defaultAdvisors(new SimpleLoggerAdvisor()).build();// 创建状态工厂,定义状态管理策略OverAllStateFactory factory = () -> {OverAllState state = new OverAllState();// 注册查询状态,使用替换策略(新值覆盖旧值)state.registerKeyAndStrategy("query", new ReplaceStrategy());// 注册结果状态,使用替换策略state.registerKeyAndStrategy("result", new ReplaceStrategy());return state;};// 构建状态图,定义节点和边的关系StateGraph stateGraph = new StateGraph("demoBoxOne", factory)// 添加查询节点,使用异步执行方式.addNode("query", AsyncNodeAction.node_async(new DemoBoxOneNode(chatClient)))// 添加结果节点,使用异步执行方式.addNode("result", AsyncNodeAction.node_async(new DemoBoxOneNode.DemoBoxResultNode()))// 定义流程边:START → query → result → END.addEdge(StateGraph.START, "query")      // 从开始到查询节点.addEdge("query", "result")              // 从查询节点到结果节点.addEdge("result", StateGraph.END);      // 从结果节点到结束// 生成并打印PlantUML格式的流程图,便于开发和调试GraphRepresentation representation = stateGraph.getGraph(GraphRepresentation.Type.MERMAID,"expander flow");log.info("\n=== expander UML Flow ===");log.info(representation.content());log.info("==================================\n");return stateGraph;}
}

关键特性:

  • 状态管理策略:使用 ReplaceStrategy 确保状态正确更新
  • 异步节点:所有节点都配置为异步执行
  • 流程可视化:自动生成 Mermaid 格式的流程图

DemoBoxOneNode - 业务逻辑节点

/*** DemoBoxOneNode - AI对话处理节点* * 该类实现了图计算中的核心业务逻辑节点,负责:* 1. 调用大语言模型处理用户查询* 2. 生成流式响应* 3. 管理节点间的状态传递* * 包含两个内部类:* - DemoBoxOneNode: 主要的查询处理节点* - DemoBoxResultNode: 结果输出节点* * @author xinggui* @version 1.0* @since 2024*/
@Slf4j
public class DemoBoxOneNode implements NodeAction {/*** 聊天客户端,用于与大语言模型进行交互*/private final ChatClient chatClient;/*** 构造函数,注入聊天客户端* * @param chatClient 配置好的聊天客户端实例*/public DemoBoxOneNode(ChatClient chatClient) {this.chatClient = chatClient;}/*** 执行节点的主要业务逻辑* * 该方法实现了以下功能:* 1. 从状态中获取用户查询内容* 2. 构建系统提示词和用户查询* 3. 调用大语言模型生成流式响应* 4. 将响应转换为异步生成器* 5. 返回包含生成器的状态映射* * @param state 当前图的状态对象,包含所有节点的共享状态* @return 包含异步生成器的状态映射* @throws Exception 当处理过程中出现错误时抛出*/@Overridepublic Map<String, Object> apply(OverAllState state) throws Exception {// 从状态中获取用户查询,如果没有则使用空字符串String query = (String) state.value("query").orElse("");// 构建流式聊天请求// 系统提示词:定义AI助手的角色和行为// 用户查询:将用户问题传递给AI模型Flux<ChatResponse> chatResponseFlux = chatClient.prompt().system("你是一个Java架构师,主要回答一些Java架构设计选型技术方面的事情。请接下来保持这样方式来回答客户").user((user) -> user.text("请回答用户的问题:").param("query", query)).stream().chatResponse();// 构建流式聊天生成器// 该生成器负责将聊天响应转换为节点输出AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder().startingNode("data")                    // 设置起始节点名称.startingState(state)                     // 设置起始状态.mapResult(chatResponse -> {              // 映射聊天响应到节点输出// 提取AI回复的文本内容String text = chatResponse.getResult().getOutput().getText();// 将文本内容包装在context字段中return Map.of("context", text);}).buildWithChatResponse(chatResponseFlux); // 使用聊天响应流构建生成器// 返回包含生成器的状态映射// 其他节点可以通过"data"键访问这个生成器return Map.of("data", generator);}/*** DemoBoxResultNode - 结果输出节点* * 该内部类负责处理最终的结果输出,主要功能:* 1. 从状态中提取处理结果* 2. 记录日志信息* 3. 返回最终结果状态*/static class DemoBoxResultNode implements NodeAction {/*** 执行结果节点的业务逻辑* * @param state 当前图的状态对象* @return 包含最终结果的状态映射* @throws Exception 当处理过程中出现错误时抛出*/@Overridepublic Map<String, Object> apply(OverAllState state) throws Exception {// 从状态中获取结果内容String result = (String) state.value("result").orElse("");// 记录最终结果到日志log.info("最终的结果result是: {}", result);// 返回包含结果的状态映射return Map.of("result", result);}}
}

核心功能:

  • 大模型调用:集成 OpenAI 模型进行智能对话
  • 流式生成:支持流式响应,实时返回结果
  • 状态传递:在节点间传递处理状态和数据

StreamUtils - 流式处理工具

/*** StreamUtils - 流式处理工具类* * 该类负责将异步生成器的流式输出转换为Server-Sent Events (SSE)格式,* 实现实时流式响应,支持客户端实时接收AI对话的生成过程。* * 主要功能:* 1. 异步处理流式输出* 2. 转换为SSE事件流* 3. 过滤和格式化输出内容* 4. 错误处理和资源管理* * @author xinggui* @version 1.0* @since 2024*/
@Slf4j
public class StreamUtils {/*** 单线程执行器,用于处理异步流式输出* 使用单线程确保事件处理的顺序性和一致性*/private static final ExecutorService executor = Executors.newSingleThreadExecutor();/*** 处理异步生成器的流式输出,转换为SSE事件流* * 该方法实现了以下核心功能:* 1. 异步处理AsyncGenerator的输出* 2. 区分不同类型的节点输出(流式输出和普通输出)* 3. 过滤掉不需要的中间节点事件* 4. 将输出转换为JSON格式的SSE事件* 5. 优雅处理完成和异常情况* * @param resultFuture 异步生成器,包含图计算的流式输出* @param sink SSE事件接收器,用于向客户端发送事件流*/public static void processStream(AsyncGenerator<NodeOutput> resultFuture, Sinks.Many<ServerSentEvent<String>> sink) {// 提交任务到执行器,异步处理流式输出executor.submit(() -> {// 遍历异步生成器的所有输出resultFuture.forEachAsync(output -> {try {// 获取当前输出节点的名称String nodeName = output.node();String content;// 判断输出类型,处理流式输出和普通输出if (output instanceof StreamingOutput streamingOutput) {// 处理流式输出(如聊天响应)// 提取聊天响应的元数据信息ChatResponse chatResponse = streamingOutput.chatResponse();// 将元数据转换为JSON格式content = JSON.toJSONString(Map.of(nodeName, chatResponse.getMetadata()));} else {// 处理普通节点输出JSONObject nodeOutput = new JSONObject();// 提取节点状态数据nodeOutput.put("data", output.state().data());// 记录节点名称nodeOutput.put("node", nodeName);// 转换为JSON字符串content = JSON.toJSONString(nodeOutput);}// 过滤掉"query"节点的事件,避免发送不必要的中间状态// 只发送有意义的输出事件给客户端if (!nodeName.equalsIgnoreCase("query")) {// 创建SSE事件并发送给客户端sink.tryEmitNext(ServerSentEvent.builder(content).build());}// 当节点名是"query"时,直接跳过,不发送任何SSE事件} catch (Exception e) {// 记录处理过程中的错误log.error("error", e);}}).thenAccept(v -> {// 所有输出处理完成后,发送完成事件sink.tryEmitComplete();}).exceptionally(e -> {// 处理过程中出现异常时的错误处理log.error("error", e);return null;});});}
}

主要职责:

  • 流式转换:将 AsyncGenerator 转换为 SSE 事件流
  • 事件过滤:过滤掉不需要的中间节点事件
  • 错误处理:优雅处理流式处理中的异常情况

技术亮点

1. 响应式编程

  • 使用 Spring WebFlux 和 Project Reactor
  • 非阻塞 I/O 处理
  • 背压控制机制

2. 流式 AI 响应

  • 实时流式输出
  • 支持长对话场景
  • 客户端实时接收响应

3. 图计算模式

  • 清晰的数据流向
  • 易于扩展和维护
  • 支持复杂的业务逻辑编排

使用方法

1. 调用接口

  • 流式调用: /code6 (返回 SSE 流)

总结

Spring AI Graph 这个流式响应的Demo展示了AI 应用开发的最佳实践,通过图计算、流式处理和响应式编程的结合,构建了高性能、可扩展的智能对话系统。这种架构模式特别适合需要复杂业务逻辑编排的 AI 应用场景。

http://www.xdnf.cn/news/20041.html

相关文章:

  • Nginx简介
  • SPI通讯协议
  • LightDock:高效蛋白质-DNA对接框架
  • 从数据孤岛到智能中枢:RAG与智能体协同架构如何重塑企业知识库
  • 代码随想录算法训练营第一天 | (二分查找类型)704.二分查找 35.探索插入位置 34.在排序数组中查找元素的第一个和最后一个位置
  • 涨粉14万,100个Coze智能体工作流模版案例:3分钟生成韩非子权谋爆款视频
  • 【C++】在 Windows 系统调用第三方程序(创建进程)
  • 专项智能练习(Photoshop软件基础)
  • mysql高级进阶(存储过程)
  • H3C UIS Cell 3020 G3服务器更换raid卡安装ONEStor记录
  • windows系统服务器测试部署springboot+vue+mysql项目
  • 企业网络安全建设三阶段实战指南
  • 商家自动运营(四)足浴店管理—东方仙盟
  • 一文掌握Redisson分布式锁
  • 【Rhino】【Python】将开放曲面转换为边界线和填充
  • [特殊字符] DA1-13 复习学习笔记
  • 极空间打造 “超级中枢”,从书签笔记到聊天分享,一键全搞定!
  • 非力扣100原题
  • FTL文件格式的原理与应用(AI)
  • AI歌手功能终于上线!Suno AI 带你保存歌曲的灵魂
  • 【教程】2025 IDEA 快速创建springboot(maven)项目
  • spring boot autoconfigure 自动配置的类,和手工 @configuration + @bean 本质区别
  • 硬件开发1-51单片机2-按键、中断
  • 域名不做网站使用,还需要备案吗
  • 这才是真正懂C/C++的人,写代码时怎么区分函数指针和指针函数?
  • Qt + windows + Linux+QtInstallerFramework打包教程
  • RabbitMQ相关知识
  • 基于 STM32N6-AI Image Classification 使用 git bash 命令行示例 LAT1552
  • 单片机点灯
  • 【C++上岸】C++常见面试题目--算法篇(第十八期)