SSE流式输出使用POST 请求
原生的EventSource只支持get请求,如果用post需要使用fetch接收,或者使用fetchEventSource插件。
node服务端代码
const express = require('express');
const cors = require('cors');
const app = express();
const port = 3000;// 设置CORS策略,允许所有来源的请求
app.use(cors());// SSE 路由
app.get('/events', (req, res) => {
let counter = 0;// 设置响应头,告诉浏览器这是一个 SSE 流res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');// 每秒推送一次数据const intervalId = setInterval(() => {counter++;res.write(`data: ${JSON.stringify({ counter })}\n\n`);// 模拟关闭连接if (counter === 10) {clearInterval(intervalId);res.write('data: {"message": "Stream ended"}\n\n');res.end();}}, 2000);// 当客户端断开连接时,清理定时器req.on('close', () => {clearInterval(intervalId);});
});app.post('/events2', (req, res) => {// 设置响应头,告诉浏览器这是一个 SSE 流res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.write(`data: hearbeat \n\n`);let tottal = 0;// 每秒推送一次数据const intervalId = setInterval(() => {tottal++;res.write(`data: ${JSON.stringify({ message: tottal })}\n\n`);// 模拟关闭连接if (tottal === 10) {clearInterval(intervalId);res.write('data: {"message": "Stream ended"}\n\n');res.end();}}, 2000);// 当客户端断开连接时,清理定时器req.on('close', () => {clearInterval(intervalId);});
});app.listen(port, () => {console.log(`Server is running on http://localhost:${port}`);
});
前端代码
一 使用Fetch
fetch('http://99.12.39.214:3000/events2', {method: 'POST',headers: {Authorization: sessionStorage.getItem('token') || '',},body: JSON.stringify({ intention: 'OTHER' }),}).then(async (response) => {console.log(response)const reader = response.body?.getReader()const decoder = new TextDecoder()let buffer = ''while (true) {const { done, value } = await reader.read()if (done) breakconst chunk = decoder.decode(value)buffer += chunk // 将上次尾部数据和本次新数据合并// 检查buffer中是否包含完整的事件流数据(以`data: `开头,以`\n\n`结尾)while (true) {// 使用\n\ndata:分割分割而不是 \n\n 是因为流数据中可能包含\n\n,所以需要取最后一个\n\n作为事件的边界const eventEnd = buffer.indexOf('\n\ndata:')if (eventEnd === -1) break // 如果没有完整的事件流数据,则继续等待下一次数据const eventBlock = buffer.slice(0, eventEnd + 2) // 获取data:之前的那部分console.log(`eventBlock:${eventBlock}`)buffer = buffer.slice(eventEnd + 2) // 移除\n\nconsole.log(`buffer:${buffer}`)// 处理一个data:事件块 将换行符换成空格let eventData = eventBlock.replaceAll('\n', ' ')if (eventData.startsWith('data: ')) {eventData = eventData.slice(6)}}}}).catch((error) => {console.error('Error:', error)})
二 使用fetchEventSource
import { fetchEventSource } from '@microsoft/fetch-event-source';const controller = new AbortController();
const { signal } = controller;fetchEventSource('http://99.12.204.199:3000/events2', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream'
},
body: JSON.stringify({ key: 'value' }),
signal,
onmessage(event) {
console.log('Message:', event.data);
},
onclose() {
console.log('Connection closed');
},
onerror(error) {
console.error('Error:', error);
}
});// 手动中断请求
controller.abort();
三 问题记录
1、本地项目post在浏览器network上看是一起返回的
问题: 在浏览器network面板上查看,心跳帧返回了,但后面内容没有流式输出。
原因: umi有压缩设置
解决: 去掉它压缩,重新启动。
配置:cross-env UMI_ENV=dev UMI_DEV_SERVER_COMPRESS=none max dev
rimraf ./src/.umi
是删除.umi文件夹,主要解决每次启动都说文件存在
"scripts": {"dev": "rimraf ./src/.umi & cross-env UMI_ENV=dev UMI_DEV_SERVER_COMPRESS=none max dev","start": "npm run dev","build": "max build",}
参考文档:SSE 开发实践
2、get请求可以流式输出,但post不行
原因: 有日志接口阻塞了post返回。
查找问题过程
- 使用
postman
或者直接在浏览器查看,都可以看到是逐帧输出的,可以排除接口问题 - 是否是
umi
框架的问题,发现有人提了,本地调试需要进行配置,但我碰到的是非本地也不行 - 新建一个
umi
项目,都是4.4.6版本,发现使用fetchEventSource
代码可以流式输出。 - 对比两个项目,发现只要加了自定义的ErrorBoundary,post就失败!!!
查看自定义的ErrorBoundary,里面有一个日志上报逻辑,去掉这个,发现post正常了!!
最后解决方案为: 在日志上报设置中移除掉所有跟ai相关的接口即可。