当前位置: 首页 > news >正文

SpringBoot手动实现流式输出方案整理以及SSE规范输出详解

背景:

最近做流式输出时,一直使用python实现的,应需求方的要求,需要通过java应用做一次封装并在java侧完成系统鉴权、模型鉴权等功能后才能真正去调用智能体应用,基于此调研java实现流式输出的几种方式,并完成与python服务对接的方案。

方案:

  • 使用Servlet原生API实现流式输出
  • 使用ResponseBodyEmitter实现异步流式输出
  • 使用SseEmitter实现服务器发送事件(SSE)
  • 使用WebFlux实现响应式流式输出
  • 使用Spring MVC的StreamingResponseBody
  • websockt

说一下我的业务场景,我原本的前后端适配已经按照SSE规范完成了功能,因此新写接口时也采用SSE规范,避免同一个系统中前端出现多种方式的调用,而且我的python微服务采用SSE规范,当时第一反应采用Feign去调用接口返回即可,但是使用后发现Openfeign支持这种调用不友好,因此接口对接这里采用的是WebClient。因此本文着重说一下SSE规范调用

一、SSE是什么

SSE (Server-Sent Events) 是一种基于HTTP的服务器向客户端推送数据的Web技术规范,它允许服务器单向地向客户端发送事件流。以下是SSE规范的全面解析:

1.基本概念

SSE是HTML5标准的一部分,主要特点包括:

  • 单向通信:仅服务器→客户端方向

  • 基于HTTP:使用普通HTTP连接

  • 文本协议:事件以纯文本格式传输

  • 自动重连:内置连接恢复机制

  • 简单易用:比WebSocket更轻量级

2. 协议格式

SSE事件流是一个UTF-8编码的文本流,包含以下字段(每个字段以\n结尾):

event: message\n
id: 123\n
retry: 5000\n
data: {\n
data: "name": "John",\n
data: "age": 30\n
data: }\n\n

  • data: 有效载荷内容(可多行,每行需加"data: "前缀)

  • event: 自定义事件类型(默认"message")

  • id: 事件ID(用于断线重连时定位)

  • retry: 重连时间(毫秒)

服务器响为:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive 

3.客户端API

浏览器端JavaScript使用EventSource接口:

const eventSource = new EventSource('/sse-endpoint');// 监听默认事件
eventSource.onmessage = (e) => {console.log('Message:', e.data);
};// 监听自定义事件
eventSource.addEventListener('customEvent', (e) => {console.log('Custom event:', e.data);
});// 错误处理
eventSource.onerror = (e) => {console.error('SSE error:', e);
};

4.与相关技术的对比

特性SSEWebSocketLong Polling
方向单向(服务器→客户端)双向单向(轮询)
协议HTTPWS/WSSHTTP
连接管理自动重连需手动处理每次请求新建连接
数据格式文本二进制/文本文本
复杂度

5. 适用场景

SSE特别适合:

  • 实时通知(新闻、股价、天气)

  • 日志流监控

  • 进度报告(文件处理、任务执行)

  • 社交媒体动态更新

  • 需要简单实时功能但不需要双向通信的场景

虽然WebSocket更强大,但SSE仍有很多优势:

  • 更简单的实现

  • 自动利用HTTP/2的多路复用

  • 不需要额外的协议升级

  • 被所有现代浏览器支持(IE除外)

二、WebClient ‌

1.概念

WebClient ‌是 Spring Framework 5中引入的一个基于响应式编程模型的 HTTP客户端 ,主要用于执行HTTP请求。相比传统的 RestTemplate ,WebClient采用了 Reactor库 ,支持非阻塞式(异步)调用,能够充分利用多核CPU资源,特别适合高并发场景。

2.与OpenFeign比较

推荐方案:优先使用WebClient + Service分层架构
原因:WebClient原生支持响应式流处理,更适合SSE场景,而OpenFeign更适合普通REST调用

备选方案:使用OpenFeign(需要特殊配置)
注意:需要Spring Cloud 2020.0.3+版本和响应式Feign支持

特性WebClient方案OpenFeign方案
响应式支持✅ 原生支持⚠️ 需要特殊配置
代码复杂度简单较复杂
维护性
性能高(非阻塞IO)中等
连接池管理自动需要手动配置
适合场景高并发流式处理简单接口调用

三、代码实现

1.基础实现

@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

2.业务进阶

2.1 依赖配置

在pom.xml中添加必要依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.2 WebClient配置

  • 使用WebClient创建HTTP客户端,支持响应式流处理

  • 配置第三方SSE接口地址和必要的请求头(如认证信息)

    WebClient配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient webClient() {return WebClient.builder().baseUrl("https://api.example.com").build();}
}

2.3Service层实现

@Service
public class WebClientSseService {@Autowiredprivate WebClient webClient;public Flux<String> streamEvents() {System.out.println("前置校验。。。。");Flux<String> resFlux = null;try{resFlux = webClient.get().uri("/stream").accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).map(data -> {// 处理原始SSE数据#if (data.startsWith("data:")) {#return data.substring(5).trim();#}return data;});}catch (Exception exception){resFlux = Flux.just("{'status': 'Error', 'message': '"+exception.getMessage()+"'}");}return resFlux;}
}

2.4 Controller

// application-web模块
@RestController
public class DataStreamController {@PostMapping(value = "/stream",consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestBody StreamRequest request) {return dataProcessor.streamEvents(request);}@PostMapping(value = "/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestParam(name = "file", required = false) MultipartFile file, @RequestParam Map<String, Object> jsonObject) {return dataProcessor.streamEvents(file, jsonObject);}
}

解释一下这两个参数:

consumes = MULTIPART_FORM_DATA_VALUE,

produces = TEXT_EVENT_STREAM_VALUE

consumes、produces 两个参数的作用与区别
参数作用示例值
consumes = MULTIPART_FORM_DATA_VALUE声明接口接收的请求内容类型(客户端→服务端)multipart/form-data
produces = TEXT_EVENT_STREAM_VALUE声明接口返回的响应内容类型(服务端→客户端)text/event-stream
为什么需要同时声明?
  1. 输入输出分离原则

    • 输入(consumes):处理文件上传需要 multipart/form-data

    • 输出(produces):SSE流式响应需要 text/event-stream

  2. HTTP协议规范

POST /upload HTTP/1.1
Content-Type: multipart/form-data  ← 对应consumes
Accept: text/event-stream         ← 对应produces
内容类型对照速查表
场景客户端设置服务端声明
文件上传+JSON响应Content-Type: multipart/form-dataconsumes = MULTIPART_FORM_DATA_VALUE
文件上传+SSE流响应Accept: text/event-streamproduces = TEXT_EVENT_STREAM_VALUE
JSON上传+SSE流响应Content-Type: application/jsonconsumes = APPLICATION_JSON_VALUE

根据需要自由选择。

2.5 这里对webclient做个扩展

如果上传的是文件可以用这个方式写body的内容

.contentType(MediaType.MULTIPART_FORM_DATA)

.body(BodyInserters.fromMultipartData(formData))

如果不同的json类型的body请求体可以这么写

.body(BodyInserters.fromValue(res)) 

注意这块的细节,我就是在这里写绕了很多

四、其他方案实现

1. 使用Servlet原生API实现流式输出

@RestController
public class StreamingController {@GetMapping("/stream1")public void stream1(HttpServletResponse response) throws IOException {response.setContentType("text/plain;charset=UTF-8");try (PrintWriter writer = response.getWriter()) {for (int i = 0; i < 100; i++) {writer.write("Data line " + i + "\n");writer.flush(); // 手动刷新缓冲区Thread.sleep(100); // 模拟延迟}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

2. 使用ResponseBodyEmitter实现异步流式输出

@RestController
public class StreamingController {@GetMapping("/stream2")public ResponseBodyEmitter stream2() {ResponseBodyEmitter emitter = new ResponseBodyEmitter();CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {emitter.send("Data line " + i + "\n");Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

3. 使用SseEmitter实现服务器发送事件(SSE)

@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

4. 使用WebFlux实现响应式流式输出

@RestController
@RequestMapping("/reactive")
public class ReactiveStreamingController {@GetMapping("/stream")public Flux<String> streamData() {return Flux.interval(Duration.ofMillis(100)).map(sequence -> "Reactive data " + sequence + "\n").take(100); // 限制输出数量}@GetMapping(value = "/stream-file", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamLargeFile() {return Flux.using(() -> Files.lines(Paths.get("large-file.txt")),Flux::fromStream,Stream::close);}
}

5. 使用Spring MVC的StreamingResponseBody

@RestController
public class StreamingResponseBodyController {@GetMapping("/stream3")public StreamingResponseBody stream3() {return outputStream -> {Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));for (int i = 0; i < 100; i++) {writer.write("Streaming line " + i + "\n");writer.flush();Thread.sleep(100);}};}
}

http://www.xdnf.cn/news/769789.html

相关文章:

  • 【速通RAG实战:进阶】23、RAG应用规范化全流程标准框架:开发、部署、监控企业级最佳实践
  • imx6ull(0):烧录、启动
  • 设计模式(行为型)-中介者模式
  • 【技术支持】安卓开发中queryUsageStats不准确的问题
  • 【linux 入门】第六章 磁盘分区+网络配置
  • NodeJS全栈WEB3面试题——P7工具链 测试
  • 自定义Shell命令行解释器
  • FreeBSD 14.3 候选版本附带 Docker 镜像和关键修复
  • SpringBoot项目搭建指南
  • 【笔记】如何卸载 MSYS2 中不同工具链的 numpy 包
  • 【Java基础】Java中的HashSet详解
  • 【接口测试】基础知识
  • 源码解析(三):Stable Diffusion
  • MyBatis04——SpringBoot整合MyBatis
  • 大模型前处理-CPU
  • 如何使用flask做任务调度
  • 【LeetCode 热题100】BFS/DFS 实战:岛屿数量 腐烂的橘子(力扣200 / 994 )(Go语言版)
  • 力扣题解654:最大二叉树
  • 算法-集合的使用
  • 代码随想录算法训练营第四天| 242.有效的字母异位词 、 349. 两个数组的交集 、 202. 快乐数 、1. 两数之和
  • 力扣热题100之对称二叉树
  • flutter开发安卓APP适配不同尺寸的手机屏幕
  • 题目 3225: 蓝桥杯2024年第十五届省赛真题-回文字符串
  • windows11安装编译QtMvvm
  • github 2FA双重认证丢失解决
  • 《操作系统真相还原》——中断
  • AIOps智能运维体系中Python故障预测与根因分析的应用实践
  • EXSI通过笔记本wifi上外网配置
  • Python编程基础(三) | 操作列表
  • 家政维修平台实战12搭建服务详情功能