当前位置: 首页 > ds >正文

【MCP】为什么使用Streamable HTTP: 相比SSE的优势与实践指南

在现代Web开发中,实时通信已经成为许多应用的核心需求。从聊天应用到股票市场更新,从游戏服务器到AI模型通信,各种技术应运而生以满足这些需求。最近,Model Context Protocol (MCP) 引入了一种新的传输机制 —— Streamable HTTP,它为服务器到客户端的实时通信提供了更优雅的解决方案。本文将深入探讨Streamable HTTP相较于Server-Sent Events (SSE)的优势,并通过实际代码示例展示其实现。

实时通信技术的演进

在深入Streamable HTTP之前,我们先简要回顾一下Web实时通信技术的发展历程:

长轮询 (Long Polling)

长轮询是早期使用的一种"黑客"方式,用于在浏览器中通过HTTP实现服务器-客户端消息传递。客户端发送HTTP请求,服务器保持连接开放直到有新数据可用。一旦服务器发送响应,客户端立即发起新的请求。这种方法虽然简单,但效率低下且可能导致消息丢失。

// 长轮询的JavaScript客户端实现
function longPoll() {fetch('http://example.com/poll').then(response => response.json()).then(data => {console.log("接收到数据:", data);longPoll(); // 立即建立新的长轮询请求}).catch(error => {// 10秒后重试setTimeout(longPoll, 10000);});
}
longPoll(); // 启动长轮询

WebSockets

WebSockets提供了一种全双工通信机制,允许在单个长连接上双向传输数据。这项技术克服了HTTP请求-响应周期的开销,非常适合低延迟、高频率更新的场景。

// WebSocket的JavaScript客户端实现
const socket = new WebSocket('ws://example.com');socket.onopen = function(event) {console.log('连接已建立');// 向服务器发送消息socket.send('你好,服务器!');
};socket.onmessage = function(event) {console.log('来自服务器的消息:', event.data);
};

尽管WebSocket API基础使用简单,但在生产环境中处理连接断开、重连和心跳检测等问题是相当复杂的。

Server-Sent Events (SSE)

Server-Sent Events提供了一种标准方式,通过HTTP将服务器更新推送到客户端。与WebSockets不同,SSE专为单向通信(从服务器到客户端)设计,适用于新闻推送、体育比分更新等场景。

// SSE的JavaScript客户端实现
const evtSource = new EventSource("https://example.com/events");// 处理消息事件
evtSource.onmessage = event => {console.log('收到消息: ' + event.data);
};

SSE的优点是自动重连及使用标准HTTP协议,但它存在一些局限性,如:

  • 需要维护长期连接
  • 浏览器限制每个域名最多6个并发连接
  • 在企业环境中可能受到代理和防火墙的限制

Streamable HTTP:新一代实时通信方案

Streamable HTTP是MCP协议在2025年3月引入的一种新传输机制,旨在取代之前的HTTP+SSE传输模式。它的设计理念是在保留SSE优点的同时克服其限制,特别是提供更好的可扩展性和企业环境兼容性。

Streamable HTTP的工作原理

Streamable HTTP的核心思想是提供一个统一的HTTP端点,同时支持POST和GET方法:

  1. POST方法:用于客户端向服务器发送请求和接收响应
  2. GET方法(可选):用于建立SSE流,接收服务器实时推送的消息

与传统HTTP+SSE不同,Streamable HTTP不要求维护单独的初始化连接和消息端点,简化了协议设计并提高了可靠性。

Streamable HTTP相比SSE的五大优势

1. 简化的通信模型

传统的HTTP+SSE方法需要两个不同的端点:一个用于建立连接,另一个用于发送消息。而Streamable HTTP提供了一个统一的端点,简化了客户端和服务器之间的交互。

传统SSE实现(两个端点):

// 服务器端(传统SSE)
router.get("/connect", async (req, res) => {const transport = new SSEServerTransport(POST_ENDPOINT, res);transports[transport.sessionId] = transport;await server.connect(transport);
});router.post(POST_ENDPOINT, async (req, res) => {const sessionId = req.query.sessionId;if (!transports[sessionId]) {res.status(400).send({ message: "无效的会话ID" });return;}await transports[sessionId].handlePostMessage(req, res, req.body);
});

Streamable HTTP实现(单一端点):

// 服务器端(Streamable HTTP)
app.post("/mcp", async (req, res) => {const sessionId = req.headers['mcp-session-id'];if (sessionId && transports[sessionId]) {// 使用现有传输await transports[sessionId].handleRequest(req, res, req.body);return;}if (!sessionId && isInitializeRequest(req.body)) {// 创建新传输const transport = new StreamableHTTPServerTransport({sessionIdGenerator: () => randomUUID(),});await server.connect(transport);await transport.handleRequest(req, res, req.body);const newSessionId = transport.sessionId;if (newSessionId) {transports[newSessionId] = transport;}return;}res.status(400).json({ error: "无效的请求" });
});app.get("/mcp", async (req, res) => {const sessionId = req.headers['mcp-session-id'];if (!sessionId || !transports[sessionId]) {res.status(400).json({ error: "无效的会话ID" });return;}await transports[sessionId].handleRequest(req, res);
});

2. 支持无状态模式

Streamable HTTP的一个重要创新是支持完全无状态操作。通过设置sessionIdGenerator: () => undefined,服务器可以在不维护会话状态的情况下处理请求,非常适合无服务器环境。

// 无状态模式配置
const transport = new StreamableHTTPServerTransport({sessionIdGenerator: () => undefined, // 启用无状态模式
});

这种无状态模式特别适合:

  • 部署在AWS Lambda、Azure Functions等无服务器环境
  • 短暂交互而非长期连接的场景
  • 需要最小化服务器内存使用的应用

3. 更好的可伸缩性

由于Streamable HTTP可以在无状态模式下运行,它非常适合容器化和自动扩展场景。服务器不需要维护长期连接,可以根据请求动态分配资源,显著提高可伸缩性。

这解决了SSE的一个主要问题:当有大量客户端时,每个客户端都需要维持一个长连接,可能导致服务器资源耗尽。使用Streamable HTTP的无状态模式,服务器只在处理请求时分配资源,处理完成后即可释放。

4. 提高的可靠性

Streamable HTTP的简化设计减少了出错机会:

  • 会话管理:在有状态模式下,会话ID通过HTTP头而非查询参数传递,减少安全风险
  • 重连处理:客户端可以在会话有效期内随时重连,无需复杂的重连逻辑
  • 错误恢复:简化的协议使错误处理和恢复更加直观

5. 更好的企业环境兼容性

在企业环境中,代理服务器和防火墙常常会阻止非标准HTTP连接。Streamable HTTP使用标准HTTP通信,大大减少了这类问题:

  • 使用标准HTTP POST和GET,无需特殊配置
  • 不依赖长连接,减少代理超时问题
  • 会话ID通过HTTP头传递,更符合企业安全要求

实现Streamable HTTP的最佳实践

以下是一个实现Streamable HTTP服务器的完整示例,包含了所有最佳实践:

import express from 'express';
import { Request, Response } from 'express';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { randomUUID } from 'crypto';// 创建MCP服务器
const server = new Server({name: "streamable-http-demo",version: "1.0.0"
}, {capabilities: {tools: {},logging: {}}
});// 创建Express应用
const app = express();
app.use(express.json());// 会话存储
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};// 单一MCP端点
const MCP_ENDPOINT = "/mcp";// 处理POST请求
app.post(MCP_ENDPOINT, async (req: Request, res: Response) => {const sessionId = req.headers['mcp-session-id'] as string | undefined;try {// 1. 重用现有会话if (sessionId && transports[sessionId]) {await transports[sessionId].handleRequest(req, res, req.body);return;}// 2. 创建新会话(初始化请求)if (!sessionId && isInitializeRequest(req.body)) {const transport = new StreamableHTTPServerTransport({sessionIdGenerator: () => randomUUID(),// 无状态模式: sessionIdGenerator: () => undefined});await server.connect(transport);await transport.handleRequest(req, res, req.body);// 存储新会话const newSessionId = transport.sessionId;if (newSessionId) {transports[newSessionId] = transport;console.log(`新会话创建: ${newSessionId}`);}return;}// 3. 处理错误情况res.status(400).json(createErrorResponse("无效的会话ID或请求方法"));} catch (error) {console.error('处理请求出错:', error);res.status(500).json(createErrorResponse("内部服务器错误"));}
});// 处理GET请求(SSE流)
app.get(MCP_ENDPOINT, async (req: Request, res: Response) => {const sessionId = req.headers['mcp-session-id'] as string | undefined;if (!sessionId || !transports[sessionId]) {res.status(400).json(createErrorResponse("无效的会话ID"));return;}try {const transport = transports[sessionId];await transport.handleRequest(req, res);console.log(`SSE流已为会话 ${sessionId} 建立`);} catch (error) {console.error('建立SSE流出错:', error);if (!res.headersSent) {res.status(500).json(createErrorResponse("建立SSE流出错"));}}
});// 辅助函数
function isInitializeRequest(body: any): boolean {return body && body.method === 'initialize';
}function createErrorResponse(message: string): any {return {jsonrpc: '2.0',error: {code: -32000,message: message,},id: null,};
}// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {console.log(`服务器运行在 http://localhost:${PORT}`);
});

在客户端,使用Streamable HTTP也非常直观:

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { URL } from 'url';async function main() {// 1. 创建客户端const client = new Client({ name: "streamable-http-client", version: "1.0.0" });// 2. 创建传输实例const transport = new StreamableHTTPClientTransport(new URL('http://localhost:3000/mcp'));// 3. 连接到服务器await client.connect(transport);console.log('已连接到服务器');// 4. 设置通知处理transport.onmessage = (message) => {console.log('接收到消息:', message);};// 5. 调用工具示例const result = await client.callTool({name: 'example-tool',arguments: { param: 'value' },});console.log('工具调用结果:', result);
}main().catch(console.error);

处理Streamable HTTP的常见挑战

尽管Streamable HTTP提供了许多优势,但在实际应用中仍需注意一些挑战:

1. 会话管理

在有状态模式下,需要妥善管理会话资源,避免内存泄漏。一个好的实践是设置会话超时机制:

// 会话超时管理
function setupSessionTimeout(sessionId, timeoutMs = 30 * 60 * 1000) {const timeoutId = setTimeout(() => {if (transports[sessionId]) {console.log(`会话 ${sessionId} 已超时,正在清理`);delete transports[sessionId];}}, timeoutMs);// 存储超时ID以便可以取消sessionTimeouts[sessionId] = timeoutId;
}// 在活动时重置超时
function resetSessionTimeout(sessionId) {if (sessionTimeouts[sessionId]) {clearTimeout(sessionTimeouts[sessionId]);setupSessionTimeout(sessionId);}
}

2. 断线重连策略

客户端应实现断线重连策略,特别是在移动网络等不稳定环境中:

class MCPClient {// ...async connectWithRetry(url, maxRetries = 5, delayMs = 1000) {let retries = 0;while (retries < maxRetries) {try {await this.connect(url);console.log('连接成功');return;} catch (error) {retries++;console.log(`连接失败,第 ${retries} 次重试...`);await new Promise(resolve => setTimeout(resolve, delayMs));// 指数退避delayMs *= 1.5;}}throw new Error('连接失败,已达到最大重试次数');}
}

3. 处理未送达事件

当客户端断线重连时,可能会错过服务器发送的事件。一个解决方案是使用事件ID和断点续传:

// 服务器端
app.post("/mcp", async (req, res) => {// ...// 包含最后事件IDconst lastEventId = req.headers['last-event-id'];if (lastEventId && sessionId) {// 发送错过的事件await sendMissedEvents(transport, lastEventId);}
});// 客户端
let lastEventId = null;transport.onmessage = (message) => {if (message.id) {lastEventId = message.id;localStorage.setItem('lastEventId', lastEventId);}
};// 重连时包含最后事件ID
async function reconnect() {lastEventId = localStorage.getItem('lastEventId');const headers = {};if (lastEventId) {headers['last-event-id'] = lastEventId;}transport = new StreamableHTTPClientTransport(url, { headers });await client.connect(transport);
}

结论

Streamable HTTP代表了Web实时通信的一个重要进步,特别适合需要在各种环境中可靠工作的企业应用。与传统SSE相比,它提供了更简化的通信模型、可选的无状态模式、更好的可伸缩性、提高的可靠性以及更好的企业环境兼容性。

随着越来越多的服务采用MCP协议,Streamable HTTP有望成为构建实时通信应用的首选方法,特别是在AI工具集成和企业应用领域。

如果你正在考虑在项目中实现实时通信,Streamable HTTP绝对值得考虑,尤其是当你的应用需要在各种网络环境中可靠运行,或者部署在无服务器环境中时。

参考资料

  1. Model Context Protocol官方规范:https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http
  2. MCP TypeScript SDK:https://github.com/modelcontextprotocol/typescript-sdk
  3. RxDB博客 - WebSockets vs SSE vs 长轮询:https://rxdb.info/articles/websockets-sse-polling-webrtc-webtransport.html
  4. 使用SSE代替WebSockets的场景:https://blog.csdn.net/adcwa/article/details/146942148
  5. MCP服务器实现示例:https://medium.com/@itsuki.enjoy/mcp-server-and-client-with-sse-the-new-streamable-http-d860850d9d9d
http://www.xdnf.cn/news/4833.html

相关文章:

  • 初识Dockerfile之RUN和WORKDIR
  • 【MySQL】第二弹——MySQL表的增删改查(CURD))
  • [ctfshow web入门] web57
  • 2025 后端自学UNIAPP【项目实战:旅游项目】3、API接口请求封装,封装后的简单测试以及实际使用
  • springCloud/Alibaba常用中间件之GateWay网关
  • 大型语言模型在网络安全领域的应用综述
  • 【WEB3】区块链、隐私计算、AI和Web3.0——数据民主化(1)
  • Python爬虫(21)Python爬虫进阶:Selenium自动化处理动态页面实战解析
  • RabbitMQ--基础篇
  • Android Studio 模拟器配置方案
  • 跨平台移动开发框架React Native和Flutter性能对比
  • 每周靶点分享:Angptl3、IgE、ADAM9及文献分享:抗体的多样性和特异性以及结构的新见解
  • 从代码学习深度学习 - 单发多框检测(SSD)PyTorch版
  • Comfyui 与 SDwebui
  • C++内存管理与模板初阶详解:从原理到实践
  • Java详解LeetCode 热题 100(13):LeetCode 53:最大子数组和(Maximum Subarray)详解
  • Android学习总结之算法篇八(二叉树和数组)
  • 10. CSS通配符与选择器权重深度解析:以《悯农》古诗为例
  • 比较Facebook与其他社交平台的隐私保护策略
  • RSS 2025|斯坦福提出「统一视频行动模型UVA」:实现机器人高精度动作推理
  • 机器视觉的手机FPC油墨丝印应用
  • 在k8s中,如何实现服务的访问,k8s的ip是变化的,怎么保证能访问到我的服务
  • 数据结构-非线性结构-二叉树
  • G口大带宽服务器线路怎么选
  • 根据窗口大小自动调整页面缩放比例,并保持居中显示
  • Python程序,输入IP,扫描该IP哪些端口对外是开放的,输出端口列表
  • Vue生命周期脚手架工程Element-UI
  • 经济体制1
  • http重新为https
  • 基于FPGA婴儿安全监护系统(蓝牙小程序监测)