当前位置: 首页 > ds >正文

MCP Server StreamableHTTP 开发学习文档

MCP Server StreamableHTTP 开发学习文档

目录

  1. StreamableHTTP 简介
  2. 核心功能与架构
  3. 源码结构与关键实现详解
    • 入口与参数配置
    • 工具注册与调用
    • 事件存储与可恢复性
    • Session 管理与 ASGI 集成
  4. SSE(Server-Sent Events)机制与实现
  5. 本地部署与运行方法
  6. 客户端访问与设置
  7. 总结与扩展建议

1. StreamableHTTP 简介

StreamableHTTP 是 MCP(Model Context Protocol)的一种基于 HTTP 的流式通信传输方式。它支持服务端向客户端持续推送事件(如通知、资源变更等),并支持客户端断线重连后事件流的恢复(Resumability)。

典型应用场景:

  • 实时通知推送
  • 长连接事件流
  • 需要断线重连恢复的流式服务

2. 核心功能与架构

本示例服务端具备以下核心能力:

  • 基于 StreamableHTTP 的流式通信:支持 HTTP POST/GET/DELETE 等操作,事件以流式方式推送给客户端。
  • 工具(Tool)注册与调用:通过 @app.call_tool()@app.list_tools() 装饰器注册和暴露工具。
  • 事件推送与可恢复性:通过 InMemoryEventStore 实现事件的唯一 ID 存储与回放,支持客户端断线重连。
  • 灵活的日志与响应格式:支持日志级别配置、JSON 或 SSE 响应格式切换。
  • ASGI 应用集成:基于 Starlette 框架,兼容 Uvicorn 等 ASGI 服务器。

3. 源码结构与关键实现详解

3.1 入口与参数配置

入口函数为 main,通过 click 命令行参数配置端口、日志级别、响应格式:

@click.command()
@click.option("--port", default=3000, help="Port to listen on for HTTP")
@click.option("--log-level", default="INFO", help="Logging level")
@click.option("--json-response", is_flag=True, default=False, help="Enable JSON responses instead of SSE streams")
def main(port: int, log_level: str, json_response: bool) -> int:# 日志配置logging.basicConfig(level=getattr(logging, log_level.upper()),format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",)...

说明:

  • 通过命令行可灵活指定服务端口、日志级别、响应格式(SSE/JSON)。
  • 便于开发、调试和生产环境切换。

3.2 工具注册与调用

工具注册

通过 @app.list_tools() 注册工具元信息,定义工具名称、描述、输入参数 schema:

@app.list_tools()
async def list_tools() -> list[types.Tool]:return [types.Tool(name="start-notification-stream",description="Sends a stream of notifications with configurable count and interval",inputSchema={"type": "object","required": ["interval", "count", "caller"],"properties": {"interval": {"type": "number", "description": "Interval between notifications in seconds"},"count": {"type": "number", "description": "Number of notifications to send"},"caller": {"type": "string", "description": "Identifier of the caller"},},},)]
工具调用

通过 @app.call_tool() 注册工具调用逻辑,实现通知流推送:

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:ctx = app.request_contextinterval = arguments.get("interval", 1.0)count = arguments.get("count", 5)caller = arguments.get("caller", "unknown")for i in range(count):notification_msg = f"[{i+1}/{count}] Event from '{caller}' - Use Last-Event-ID to resume if disconnected"await ctx.session.send_log_message(level="info",data=notification_msg,logger="notification_stream",related_request_id=ctx.request_id,)if i < count - 1:await anyio.sleep(interval)await ctx.session.send_resource_updated(uri=AnyUrl("http:///test_resource"))return [types.TextContent(type="text",text=f"Sent {count} notifications with {interval}s interval for caller: {caller}",)]

说明:

  • 工具调用时会循环推送多条通知,演示流式推送能力。
  • 每条通知都带有序号和 caller 标识,便于客户端追踪。
  • 支持通过 Last-Event-ID 机制恢复事件流。

3.3 事件存储与可恢复性

InMemoryEventStore
from .event_store import InMemoryEventStore
event_store = InMemoryEventStore()
  • 作用:为每个 SSE 事件分配唯一 ID,并存储事件内容,支持客户端断线后通过 Last-Event-ID 头部恢复未接收的事件。
  • 注意:本实现为内存存储,仅适合演示。生产环境需替换为持久化存储(如 Redis、数据库等)。

3.4 Session 管理与 ASGI 集成

Session 管理
session_manager = StreamableHTTPSessionManager(app=app,event_store=event_store,json_response=json_response,
)
  • StreamableHTTPSessionManager 负责管理 HTTP 流会话、事件推送、事件恢复等。
ASGI 集成
async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:await session_manager.handle_request(scope, receive, send)starlette_app = Starlette(debug=True,routes=[Mount("/mcp", app=handle_streamable_http)],lifespan=lifespan,
)
  • 通过 Starlette 框架将 /mcp 路由挂载到自定义的流式 HTTP 处理器,实现与 ASGI 服务器(如 Uvicorn)的无缝集成。

4. SSE机制与实现

SSE(Server-Sent Events)简介

  • SSE 是一种基于 HTTP 的单向服务器推送技术,客户端通过持久连接接收服务端实时推送的事件。
  • 典型应用:实时通知、进度推送、消息流等。

本示例 SSE 实现要点

  • 事件通过 send_log_messagesend_resource_updated 等方法推送。
  • 每个事件分配唯一 ID,便于客户端断线重连后通过 Last-Event-ID 头部恢复。
  • 事件存储于 InMemoryEventStore,支持事件回放。

5. 本地部署与运行方法

环境准备

  1. 安装依赖
    确保已安装 Python 3.8+,并安装相关依赖(如未提供 requirements.txt,可根据源码推断):

    pip install anyio click mcp starlette pydantic uvicorn
    
  2. 运行服务

    # 进入示例目录
    cd python-sdk/examples/servers/simple-streamablehttp# 启动服务(默认端口3000,SSE模式)
    uv run mcp-simple-streamablehttp --port 3000# 或直接用 Python 启动
    python -m mcp_simple_streamablehttp.server --port 3000
    
    • --log-level DEBUG 可开启详细日志
    • --json-response 可切换为 JSON 响应(默认 SSE)

6. 客户端访问与设置

SSE 客户端访问

  • 客户端可通过 HTTP POST/GET 方式访问 /mcp 路由,接收流式事件。
  • 推荐使用支持 SSE 的 HTTP 客户端或浏览器插件进行测试。
示例:使用 curl 订阅 SSE
curl -N -H "Accept: text/event-stream" -X POST http://localhost:3000/mcp \-H "Content-Type: application/json" \-d '{"tool":"start-notification-stream","arguments":{"interval":1,"count":5,"caller":"test-client"}}'
  • -N 保持连接不断开
  • 通过 Last-Event-ID 头部可实现断线重连恢复
断线重连示例
curl -N -H "Accept: text/event-stream" -H "Last-Event-ID: <上次收到的事件ID>" \-X POST http://localhost:3000/mcp \-H "Content-Type: application/json" \-d '{"tool":"start-notification-stream","arguments":{"interval":1,"count":5,"caller":"test-client"}}'

其他客户端

  • 推荐使用 Inspector 或自定义 Typescript/Python 客户端。
  • Typescript SDK 已内置 StreamableHTTP 客户端示例。

7. 总结与扩展建议

  • 本示例演示了 MCP StreamableHTTP 的服务端实现、工具注册、事件推送与恢复机制。
  • 适合用于需要实时推送与断线恢复的场景。
  • 扩展建议
    • 将事件存储替换为持久化方案(如 Redis、数据库等)
    • 增加多工具、多事件类型支持
    • 加强安全认证与权限控制
    • 丰富客户端示例

如需进一步深入学习 MCP 协议、StreamableHTTP 机制或客户端开发,可参考官方文档或相关开源项目。

http://www.xdnf.cn/news/8300.html

相关文章:

  • RT-Thread源码阅读(2)——任务启动与调度
  • ArkTs中的尾随闭包
  • 如何重新设置网络ip地址?全面解析多种方法
  • 第八天 搭建车辆状态监控平台(Docker+Kubernetes) OTA升级服务开发(差分升级、回滚机制)
  • eNSP防火墙实现GRE over IPSec
  • 文件操作和IO-3 文件内容的读写
  • 【Java高阶面经:数据库篇】16、分库分表主键:如何设计一个高性能唯一ID
  • transformer网络
  • 云曦25年春季期中考核复现
  • 【会议推荐|权威出版】2025年电力工程与电气技术国际会议(PEET 2025)
  • Python 训练 day31
  • ssh登录设备总提示密码错误解决方法
  • 使用 Navicat 17 for PostgreSQL 时,请问哪个版本支持 PostgreSQL 的 20150623 版本?还是每个版本都支持?
  • Skia如何在窗口上绘图
  • 突破免疫研究瓶颈!Elabscience IL - 4 抗体 [11B11](APC 偶联)靶向识别小鼠细胞因子
  • 纯JS前端转图片成tiff格式
  • 选择第三方软件检测机构做软件测试的三大原因
  • 从零开始学习QT——第二步
  • Rabbit MQ
  • CSS:vertical-align用法以及布局小案例(较难)
  • Spring AI Alibaba 调用文生语音模型(CosyVoice)
  • 基于labview的声音采集与存储分析系统
  • 深入浅出DDD:从理论到落地的关键
  • 海南藏族自治州政府门户网站集约化建设实践与动易解决方案应用
  • Java集合框架入门指南:从小白到基础掌握
  • 聚水潭ERP(奇门)集成用友ERP(用友U8、U9、NC、BIP、畅捷通T+、好业财)
  • 位图算法——判断唯一字符
  • 百度智能云千帆AppBuilder RAG流程技术文档
  • 佰力博科技与您探讨半导体电阻测试常用的一些方法
  • Qt 布局管理器的层级关系