【AI】SpringAI 第二弹:基于多模型实现流式输出
目录
一、基于多模型实现流式输出
1.1 什么是流式输出
1.2 多模型引入
1.3 代码实现
1.3.1 流式输出的API介绍
1.3.2 Flux 源码分析
二、了解 Reactor 模型
三、SSE 协议
一、基于多模型实现流式输出
1.1 什么是流式输出
流式输出(Streaming Output)是指数据在生成过程中就逐步传输给接收方,而不是等待全部处理完成后再一次性输出。这种模式具有以下优势:
- 低延迟:用户可以立即看到部分结果,无需等待全部处理完成
- 资源高效:可以更早释放部分资源,减少内存占用
- 用户体验好:渐进式的反馈让用户感知到系统正在工作
1.2 多模型引入
1. 添加相关依赖
<dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-model-openai</artifactId></dependency><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-model-ollama</artifactId></dependency><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-model-qianfan</artifactId></dependency>
2. 设置配置文件
spring:ai:openai:api-key: ${自定义}base-url: https://dashscope.aliyuncs.com/compatible-mode/chat:options:model: ${自定义}ollama:base-url: http://127.0.0.1:11434chat:options:model: ${自定义}qianfan:api-key: ${自定义}secret-key: ${自定义}chat:options:model: ${自定义}
1.3 代码实现
1. OllamaController
@RestController
@RequestMapping("/ol")
public class OllamaController {@Resourceprivate OllamaChatModel chatModel;@GetMapping(value = "/chat",produces = "text/event-stream")public Flux<String> chat(String prompt) {return chatModel.stream(prompt);}
}
调用结果如下:
2. QianFanController
@RestController
@RequestMapping("/qianfan")
public class QianFanController {@Autowiredprivate QianFanChatModel chatModel;@RequestMapping(value = "/stream",produces = "text/event-stream")public Flux<String> chat(@RequestParam("question") String question) {return chatModel.stream(question);}
}
调用结果如下:
3. BaiLianController:通过对接通用的大模型平台,实现同平台的多种大模型切换
ChatClent是对ChatModel的封装
@RestController
@RequestMapping("/bl")
public class BaiLianController {@Autowiredprivate ChatClient deepseekClient;@Autowiredprivate ChatClient qwenClient;@RequestMapping(value = "/chatDeepSeek",produces = "text/event-stream")public Flux<String> chatDeepSeek(String message) {return deepseekClient.prompt(message).stream().content();}@RequestMapping(value = "/chatQwen",produces = "text/event-stream")public Flux<String> chatQwen(String message) {return qwenClient.prompt(message).stream().content();}
}
Flux 是 Spring WebFlux 框架中的一个核心组件,属于响应式编程模型的一部分。它主要用于处理异步、非阻塞的流式数据,能够高效地处理高并发场景。Flux 可以生成和处理一系列的事件或数据, 如流式输出等。
调用 DeepSeek 结果如下:(基于 ChatGPT4模型)
调用 通问千义 结果如下:
1.3.1 流式输出的API介绍
基于 ChatModel:
基于 ChatClient:
1.3.2 Flux 源码分析
要搞清楚这个问题,我们需要看流式输出对象 Flux 的实现源码:
查看 Flux 源码我们发现它是属于 reactor.core.publisher 包下的抽象类:
Reactor Streams 会订阅数据源,当有数据时,Reactor Streams 以分块流的方式 发送给客户端(用户)。
二、了解 Reactor 模型
Reactor 是一种事件驱动的高性能网络编程模型,主要用于处理高并发的网络 I/O 请求。其核心思想是通过一个或多个线程监听事件,并将事件分发给相应的处理程序,从而实现高效的并发处理。
Reactor 模型的主要特征如下:
-
事件驱动:所有 I/O 操作都由事件触发并处理。
-
非阻塞:操作不会因为 I/O 而挂起,避免了线程等待的开销。
-
高效资源利用:通过少量线程处理大量并发连接,提升性能。
-
组件分离:将事件监听(Reactor)、事件分发(Dispatcher)和事件处理(Handler)解耦, 使代码结构更清晰。
Reactor 实现方式有三种:
-
单线程 Reactor 模型:所有操作在一个线程完成,适用于低并发场景。
-
多线程 Reactor 模型:主线程处理连接,子线程池处理 I/O 和业务。
-
主从 Reactor 模型:主线程池处理连接,子线程池处理 I/O(进一步优化资源分配)。
三、SSE 协议
除了 Flux 实现之外,我们还可以通过 Server-Sent Events (SSE) 协议实现流式输出,通过单向 的 HTTP 长连接,服务端可以持续推送数据片段(如逐词或逐句)到前端。与 WebSocket 不同, SSE 是轻量级的单向通信协议,适合 AI 对话这类服务端主导的场景,它的具体实现代码如下:
@GetMapping(value = "/streamChat", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")public SseEmitter streamChat(@RequestParam String message) {// 创建SSE发射器,设置超时时间(60秒)SseEmitter emitter = new SseEmitter(60_000L);// 创建Prompt对象Prompt prompt = new Prompt(new UserMessage(message));// 使用AtomicReference保存订阅以便后续取消AtomicReference<Disposable> subscriptionRef = new AtomicReference<>();// 订阅流式响应subscriptionRef.set(chatModel.stream(prompt).subscribe(response -> {try {// 防御性检查if (response == null || response.getResult() == null || response.getResult().getOutput() == null) {return;}String content = response.getResult().getOutput().getText();if (content == null || content.isEmpty()) {return;}// 发送SSE事件SseEmitter.SseEventBuilder event = SseEmitter.event().data(content).id(UUID.randomUUID().toString()).comment("Chat response chunk");emitter.send(event);} catch (IOException e) {throw new RuntimeException(e);}},error -> {},() -> {}));// 处理客户端断开连接emitter.onCompletion(() -> {Disposable subscription = subscriptionRef.get();if (subscription != null && !subscription.isDisposed()) {subscription.dispose();}});// 处理超时emitter.onTimeout(() -> {Disposable subscription = subscriptionRef.get();if (subscription != null && !subscription.isDisposed()) {subscription.dispose();}});// 处理错误emitter.onError((throwable) -> {Disposable subscription = subscriptionRef.get();if (subscription != null && !subscription.isDisposed()) {subscription.dispose();}});return emitter;}