SpringBoot中使用Flux实现流式返回的技术总结
背景
近期在使用deepseek/openai等网页和APP时,发现大模型在思考和回复时,内容是一点点的显示出来的,于是好奇他们的实现方式。经调研和使用开发者工具抓取请求,每次聊天会向后台发送一个http请求,而这个接口跟普通接口一次性返回不一样,而是以流式的返回。
流式返回的核心概念与优势
在传统的 Web 开发中,接口通常以「一次性返回完整响应体」的形式工作。而 ** 流式返回(Streaming Response)** 指的是服务器在处理请求时,将响应结果分段逐步返回给客户端,而非等待所有数据生成完成后再一次性返回。这种模式具有以下核心优势:
1. 提升用户体验
- 对于大数据量响应(如文件下载、长文本流)或实时交互场景(如聊天机器人对话),客户端可边接收数据边处理,减少「空白等待时间」,提升实时性感知。
2. 降低内存消耗
- 服务器无需在内存中缓存完整响应数据,尤其适合处理高并发、大流量场景,降低 OOM(内存溢出)风险。
3. 支持长连接与实时通信
- 天然适配实时数据推送场景(如日志监控、股票行情更新),可与 SSE(Server-Sent Events)、WebSocket 等技术结合使用。
大模型的接口,尤其是那些带推理的模型接口返回,数据就是一点点的返回的,因此如果要提升用户体验,最好的方式就是采用流式接口返回。
在SpringBoot中基于Flux的流式接口实现
1. 依赖配置
在 pom.xml
中引入 WebFlux 依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2. 流式接口实现(以模拟大模型对话为例)
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;@RestController
@RequestMapping("/api/chat")
public class ChatController {@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamChat(@RequestBody ChatRequest request) {// 调用大模型 API 并返回 Flux 流return callLargeModelApi(request.message()).doOnNext(chunk -> log.info("发送响应片段: {}", chunk)).doOnError(error -> log.error("流式处理出错", error));}// 模拟调用大模型 API,返回 Flux 流private Flux<String> callLargeModelApi(String prompt) {// 实际项目中需替换为真实的大模型调用逻辑return Flux.just("您好!", "我是您的AI助手。", "您的问题是:" + prompt, "我将为您提供详细解答...").delayElements(Duration.ofMillis(300)); // 模拟实时响应延迟}
}
3. 关键配置说明
- 响应格式:设置
produces = MediaType.TEXT_EVENT_STREAM_VALUE
,符合 SSE 协议。 - 异步处理:Flux 流中的元素会被自动转换为 SSE 格式(
data: <内容>\n\n
)并推送至客户端。 - 背压控制:通过
onBackpressureBuffer()
或onBackpressureDrop()
处理客户端消费速率问题。
浏览器端 JS 调用方案
1. 使用 EventSource(简化版)
function connectWithEventSource() {const source = new EventSource("/api/chat");const chatWindow = document.getElementById("chat-window");source.onmessage = (event) => {chatWindow.innerHTML += `<div>${event.data}</div>`;chatWindow.scrollTop = chatWindow.scrollHeight;};source.onerror = (error) => {console.error("EventSource failed:", error);source.close();};
}
2. 使用 Fetch API(支持 POST 请求)
async function connectWithFetch() {const response = await fetch("/api/chat", {method: "POST",headers: { "Content-Type": "application/json" },body: JSON.stringify({ message: "你好" })});const reader = response.body.getReader();const decoder = new TextDecoder();const chatWindow = document.getElementById("chat-window");while (true) {const { done, value } = await reader.read();if (done) break;// 解码并处理数据块const chunk = decoder.decode(value, { stream: true });const messages = chunk.split('\n\n').filter(line => line.trim().startsWith('data:')).map(line => line.replace('data:', '').trim());messages.forEach(msg => {chatWindow.innerHTML += `<div>${msg}</div>`;chatWindow.scrollTop = chatWindow.scrollHeight;});}
}
//TODO
尝试写一个网页,调用流失接口,实现与大模型的交流。