当前位置: 首页 > news >正文

【AI】SpringAI 第二弹:基于多模型实现流式输出

目录

一、基于多模型实现流式输出

1.1 什么是流式输出

1.2 多模型引入

1.3 代码实现

1.3.1 流式输出的API介绍

1.3.2 Flux 源码分析

二、了解 Reactor 模型

三、SSE 协议


一、基于多模型实现流式输出

1.1 什么是流式输出

流式输出(Streaming Output)是指数据在生成过程中就逐步传输给接收方,而不是等待全部处理完成后再一次性输出。这种模式具有以下优势:

  1. 低延迟​:用户可以立即看到部分结果,无需等待全部处理完成
  2. 资源高效​:可以更早释放部分资源,减少内存占用
  3. 用户体验好​:渐进式的反馈让用户感知到系统正在工作

1.2 多模型引入

1. 添加相关依赖

    <dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-model-openai</artifactId></dependency><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-model-ollama</artifactId></dependency><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-model-qianfan</artifactId></dependency>

2. 设置配置文件

spring:ai:openai:api-key: ${自定义}base-url: https://dashscope.aliyuncs.com/compatible-mode/chat:options:model: ${自定义}ollama:base-url: http://127.0.0.1:11434chat:options:model: ${自定义}qianfan:api-key: ${自定义}secret-key: ${自定义}chat:options:model: ${自定义}

1.3 代码实现

1. OllamaController

@RestController
@RequestMapping("/ol")
public class OllamaController {@Resourceprivate OllamaChatModel chatModel;@GetMapping(value = "/chat",produces = "text/event-stream")public Flux<String> chat(String prompt) {return chatModel.stream(prompt);}
}

调用结果如下:

2. QianFanController

@RestController
@RequestMapping("/qianfan")
public class QianFanController {@Autowiredprivate QianFanChatModel chatModel;@RequestMapping(value = "/stream",produces = "text/event-stream")public Flux<String> chat(@RequestParam("question") String question) {return chatModel.stream(question);}
}

调用结果如下:

3. BaiLianController:通过对接通用的大模型平台,实现同平台的多种大模型切换

ChatClent是对ChatModel的封装

@RestController
@RequestMapping("/bl")
public class BaiLianController {@Autowiredprivate ChatClient deepseekClient;@Autowiredprivate ChatClient qwenClient;@RequestMapping(value = "/chatDeepSeek",produces = "text/event-stream")public Flux<String> chatDeepSeek(String message) {return deepseekClient.prompt(message).stream().content();}@RequestMapping(value = "/chatQwen",produces = "text/event-stream")public Flux<String> chatQwen(String message) {return qwenClient.prompt(message).stream().content();}
}

Flux 是 Spring WebFlux 框架中的一个核心组件,属于响应式编程模型的一部分。它主要用于处理异步、非阻塞的流式数据,能够高效地处理高并发场景。Flux 可以生成和处理一系列的事件或数据, 如流式输出等。

调用 DeepSeek 结果如下:(基于 ChatGPT4模型)

调用 通问千义 结果如下:

1.3.1 流式输出的API介绍

基于 ChatModel:

基于 ChatClient:

1.3.2 Flux 源码分析

要搞清楚这个问题,我们需要看流式输出对象 Flux 的实现源码:

查看 Flux 源码我们发现它是属于 reactor.core.publisher 包下的抽象类:

Reactor Streams 会订阅数据源,当有数据时,Reactor Streams 以分块流的方式 发送给客户端(用户)。

二、了解 Reactor 模型

Reactor 是一种事件驱动的高性能网络编程模型,主要用于处理高并发的网络 I/O 请求。其核心思想是通过一个或多个线程监听事件,并将事件分发给相应的处理程序,从而实现高效的并发处理。

Reactor 模型的主要特征如下:

  1. 事件驱动:所有 I/O 操作都由事件触发并处理。

  2. 非阻塞:操作不会因为 I/O 而挂起,避免了线程等待的开销。

  3. 高效资源利用:通过少量线程处理大量并发连接,提升性能。

  4. 组件分离:将事件监听(Reactor)、事件分发(Dispatcher)和事件处理(Handler)解耦, 使代码结构更清晰。

Reactor 实现方式有三种:

  1. 单线程 Reactor 模型:所有操作在一个线程完成,适用于低并发场景。

  2. 多线程 Reactor 模型:主线程处理连接,子线程池处理 I/O 和业务。

  3. 主从 Reactor 模型:主线程池处理连接,子线程池处理 I/O(进一步优化资源分配)。

三、SSE 协议

除了 Flux 实现之外,我们还可以通过 Server-Sent Events (SSE) 协议实现流式输出,通过单向 的 HTTP 长连接,服务端可以持续推送数据片段(如逐词或逐句)到前端。与 WebSocket 不同, SSE 是轻量级的单向通信协议,适合 AI 对话这类服务端主导的场景,它的具体实现代码如下:

@GetMapping(value = "/streamChat", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")public SseEmitter streamChat(@RequestParam String message) {// 创建SSE发射器,设置超时时间(60秒)SseEmitter emitter = new SseEmitter(60_000L);// 创建Prompt对象Prompt prompt = new Prompt(new UserMessage(message));// 使用AtomicReference保存订阅以便后续取消AtomicReference<Disposable> subscriptionRef = new AtomicReference<>();// 订阅流式响应subscriptionRef.set(chatModel.stream(prompt).subscribe(response -> {try {// 防御性检查if (response == null || response.getResult() == null || response.getResult().getOutput() == null) {return;}String content = response.getResult().getOutput().getText();if (content == null || content.isEmpty()) {return;}// 发送SSE事件SseEmitter.SseEventBuilder event = SseEmitter.event().data(content).id(UUID.randomUUID().toString()).comment("Chat response chunk");emitter.send(event);} catch (IOException e) {throw new RuntimeException(e);}},error -> {},() -> {}));// 处理客户端断开连接emitter.onCompletion(() -> {Disposable subscription = subscriptionRef.get();if (subscription != null && !subscription.isDisposed()) {subscription.dispose();}});// 处理超时emitter.onTimeout(() -> {Disposable subscription = subscriptionRef.get();if (subscription != null && !subscription.isDisposed()) {subscription.dispose();}});// 处理错误emitter.onError((throwable) -> {Disposable subscription = subscriptionRef.get();if (subscription != null && !subscription.isDisposed()) {subscription.dispose();}});return emitter;}

http://www.xdnf.cn/news/499465.html

相关文章:

  • 江协科技GPIO输入输出hal库实现
  • QT+Visual Studio 配置开发环境教程
  • Python异常模块和包
  • Oracle 高水位线(High Water Mark, HWM)
  • 自定义库模块增加自定义许可操作详细方法
  • c++动态链接库
  • 04_决策树
  • MySQL只操作同一条记录也会死锁吗?
  • 支持selenium的chrome driver更新到136.0.7103.94
  • 【Java ee初阶】HTTP(2)
  • 【MySQL】第五弹——表的CRUD进阶(三)聚合查询(上)
  • Docker数据卷
  • 深入解析Spring Boot与JUnit 5的集成测试实践
  • FTP服务搭建实战:安全文件共享解决方案
  • 使用Docker部署Nacos
  • 机器学习-人与机器生数据的区分模型测试 -数据筛选
  • 【AI论文】EnerVerse-AC:用行动条件来构想具身环境
  • stm32 DMA
  • 【八股战神篇】Java集合高频面试题
  • Redis Sentinel如何实现高可用?
  • 类加载 与 Spring容器加载
  • STM32 | 软件定时器
  • 【发票提取表格】批量PDF电子发票提取明细保存到Excel表格,批量提取ODF电子发票明细,行程单明细,单据明细保存到表格,使用步骤、详细操作方法和注意事项
  • Java—异常体系
  • 【Linux笔记】——Linux线程封装
  • Ulyssess Ring Attention
  • Python文件与JSON操作全解:从基础到企业级实践
  • A级、B级弱电机房数据中心建设运营汇报方案
  • Ankr:Web3基础设施的革新者
  • Zephyr OS 中的 FIFO 接口应用介绍