当前位置: 首页 > ds >正文

ResponseBodyEmitter与SseEmitter使用

目录

  • 背景
  • ResponseBodyEmitter
    • 简介
    • 核心代码实现
      • 后端代码
      • 前端代码
    • 运行效果
  • SseEmitter
    • 简介
    • 核心代码实现
      • 后端代码
      • 前端代码
    • 运行效果
  • 小程序
    • 小程序代码
    • 运行效果
  • 本文源码
  • 总结

背景

  • 最近在接入阿里云百炼AI助手时,接触到ResponseBodyEmitter
    在这里插入图片描述在这里插入图片描述
  • 实现了流式返回,比较感兴趣,所以去了解了一下如何实现。

ResponseBodyEmitter

简介

  • Spring 框架提供的通用流式传输接口,支持分块传输编码(Chunked Encoding),允许逐步向客户端发送数据块,异步推送数据,而非一次性响应。返回ResponseBodyEmitter灵活性强,也可以自己构造标准的SSE返回。

核心代码实现

后端代码

private final ExecutorService executorService = Executors.newFixedThreadPool(2000);private final List<String> replyData = Arrays.asList("我是", "您的AI助手", "有什么可以帮您", "我是", "您的AI助手", "有什么可以帮您");/*** 返回ResponseBodyEmitter灵活性强,也可以自己构造标准的SSE返回* @param response* @return*/@RequestMapping(value = "/responseBodyEmitter")@CrossOriginpublic ResponseBodyEmitter responseBodyEmitter(HttpServletResponse response) {ResponseBodyEmitter emitter = new ResponseBodyEmitter(180000L);executorService.execute(() -> {try {for (String value : replyData) {emitter.send(value.getBytes(java.nio.charset.StandardCharsets.UTF_8));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {log.error("其他的请求聊天异常 {}", e);emitter.completeWithError(e);throw new RuntimeException(e);}});return emitter;}

前端代码

  • 先来看百炼助手使用fetch
    在这里插入图片描述

  • 我是使用Nuxt 3 框架,也用fetch API调用


const fetchStream = async () => {const response = await fetch('http://127.0.0.1:8080/api/index/responseBodyEmitter'); // 替换为你的接口路径const reader = response.body.getReader();const decoder = new TextDecoder('utf-8');// 持续读取数据流while (true) {const { done, value } = await reader.read();if (done) {console.log('Stream completed');break;}const textChunk = decoder.decode(value, { stream: true });console.log('Received chunk:', textChunk);text.value += textChunk}
}onMounted(()=>{console.log("onMounted")// 返回ResponseBodyEmitter fetchStream()})

运行效果

在这里插入图片描述

SseEmitter

  • 在了解ResponseBodyEmitter时又发现了SseEmitter。

简介

  • ResponseBodyEmitter的子类,为​​Server-Sent Events(SSE)​​协议设计,基于text/event-stream格式实现服务器到客户端的单向推送。自带重连机制。

核心代码实现

后端代码

   private final List<String> replyData = Arrays.asList("我是", "您的AI助手", "有什么可以帮您", "我是", "您的AI助手", "有什么可以帮您");private final ExecutorService executorService = Executors.newFixedThreadPool(2000);@RequestMapping("/chat")@CrossOriginpublic SseEmitter chat(String query) {SseEmitter emitter = new SseEmitter(180000L);executorService.execute(() -> {try {for (int i = 0; i < replyData.size(); i++) {String value = replyData.get(i);emitter.send(value.getBytes(java.nio.charset.StandardCharsets.UTF_8));Thread.sleep(1000);}emitter.send(SseEmitter.event().name("end").data("[DONE]"));Thread.sleep(1000);emitter.complete();} catch (Exception e) {log.error("其他的请求聊天异常 {}", e);emitter.completeWithError(e);throw new RuntimeException(e);}});log.info("返回emitter");return emitter;}

前端代码


//接收后台消息
const  receiveMessage = () =>{let eventSource  = new EventSource('http://127.0.0.1:8080/api/index/chat');eventSource.onopen = (event) =>{console.log("onopen ",event); }//接收成功eventSource.onmessage = (event) => {console.log("onmessage ",event);text.value =  text.value + event.data;};eventSource.addEventListener('end', (event) => {console.log("服务器主动关闭连接");eventSource.close(); // 主动关闭连接});//接收失败eventSource.onerror = (error) => {console.error('SSE error:',eventSource.readyState, error);if (eventSource.readyState === EventSource.CLOSED) { console.log("正常关闭"); // 应在此过滤已关闭状态} else {console.error("真实错误:");}// 如果不close,会自动重连eventSource.close()};}onMounted(()=>{console.log("onMounted")// SseEmitter 的方式receiveMessage()})

运行效果

在这里插入图片描述

小程序

  • 小程序也支持SSE,核心代码如下,我使用的是uni-app-x

小程序代码


let title = ref<string>("hello")onMounted(() => {console.log("onMounted")startStream()})const startStream = () => {const requestTask = wx.request({url: 'http://127.0.0.1:8080/api/index/chat', // 后端接口地址method: 'GET',// dataType: 'text',//responseType: 'stream', // 关键:设置响应类型为 streamenableChunked: true,success: (res) => {},fail: (err) => {console.error("请求失败:", err);}});const decoder = new TextDecoder('utf-8');requestTask.onChunkReceived(function (resp) {let data = decoder.decode(resp.data)console.log(data)title.value += data});}

运行效果

在这里插入图片描述

本文源码

  • https://github.com/1030907690/emitter-test
  • https://github.com/1030907690/emitter-test-mobile
  • https://github.com/1030907690/response-body-emitter-web

总结

  • ​​ResponseBodyEmitter​​适用于更灵活的流式传输​​场景,如大文件下载或兼容性要求高的实时日志。SseEmitter则是基于标准的SSE协议。

  • ResponseBodyEmitter、SseEmitterDeferredResult、Callable同为异步处理,不同的是 DeferredResult、Callable只能发送一次数据。ResponseBodyEmitter、SseEmitter可以多次调用send发送多次。

  • 想要了解DeferredResult和Callable,可以参考本人拙作 Spring MVC(Boot) Servlet 3.0异步处理,DeferredResult和Callable、Spring MVC(Boot) Servlet 3.0异步处理,DeferredResult和Callable(续篇)

http://www.xdnf.cn/news/8839.html

相关文章:

  • MyBatis实战指南(二)如何实现小鸟图标与导入Teacher数据库表实战
  • 《深入剖析:Python自动化测试框架之unittest与pytest》
  • 微服务——网关
  • TypeScript
  • OpenCV 第7课 图像处理之平滑(一)
  • Flink流水线集成Gravitino
  • 微软Build 2025五大AI发布
  • 人工智能数学基础实验(五):牛顿优化法-电动汽车充电站选址优化
  • 基于微信小程序的漫展系统的设计与实现
  • 研报精读:数据要素市场培育及企业数据资源会计处理实证研究【附全文阅读】
  • 基于opencv的全景图像拼接
  • 【ExcelVBA 】类模块学习从入门到放弃
  • 数据仓库中的业务域与数据域
  • 关于PHP的详细介绍,结合其核心特点、应用场景及2025年的技术发展趋势,以清晰的结构呈现:
  • 用HTML5实现实时ASCII艺术摄像头
  • git子模块--常见操作
  • HarmonyOS NEXT 技术特性:分布式软总线技术架构
  • OpenLayers 加载全屏显示控件
  • 【Fargo】razor框架调用mediasoup的发送和接收能力
  • FFT Shift
  • 双目视野高精度拼接
  • PCB设计教程【入门篇】——PCB设计基础-PCB构成与组成
  • DeepONet深度解析:原理、架构与实现
  • python+vlisp实现对多段线范围内土方体积的计算
  • 连接表、视图和存储过程
  • JDK21深度解密 Day 3:模式匹配全解析
  • pvlib(太阳轨迹)
  • nginx的一些配置的意思
  • 攻防世界RE-666
  • 发电厂进阶,modbus TCP转ethernet ip网关如何赋能能源行业