当前位置: 首页 > news >正文

MCP 传输层代码分析

MCP 传输层代码分析

MCP 整体架构说明

引用官方文档原文:Model Context Protocol (MCP) 构建在一个灵活且可扩展的架构上,使 LLM 应用和集成之间的无缝通信成为可能。具体架构细节可以参考文档(核心架构 - MCP 中文文档)。MCP 采用分层架构,分为协议层与传输层,官方实现的传输层支持两种实现:Stdio 传输通过 HTTP 的 SSE 传输。本文旨在通过官方提供的 Python 库代码,来分析 通过 HTTP 的 SSE 传输 方式的实现。

Python 用到的技术

  • AnyIOanyio 是 Python 的一个异步编程库,它提供了统一的 API 来编写兼容多种异步运行时(如 asynciotrio)的代码。
  • StarletteStarlette 是一个轻量级的 ASGI(异步服务器网关接口) Web 框架,专为 Python 异步编程设计,具有高性能、灵活和极简的特点。

细节说明

在此我会列出以上两个库用到的最主要的几个方法,特别是第一个方法。

  • anyio.create_memory_object_stream:此方法返回一个输入流和一个输出流,可以将其看成是一个水管,返回的参数可以看成是水管的两端。从一个流输入数据,可以从另一个流输出数据。代码中总共用到了三组通道(即输入流和输出流)。传输层与协议层之间是通过流来交流数据的。
  • anyio.create_task_group:用于创建任务组。
  • EventSourceResponse:这是 StarletteFastAPI 中用于实现 Server-Sent Events (SSE) 的响应类。注意代码中用到的参数 content,用于接收输入流,data_sender_callable,用于发送数据的方法(我没有找到相关的文档)。

整体流程

在这里插入图片描述

对于协议层的封装在 lowlevel.server.Server 中,其中 run 方法接收一个输入流和一个输出流。函数签名如下:

async def run(self,read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception],write_stream: MemoryObjectSendStream[types.JSONRPCMessage],initialization_options: InitializationOptions,# When False, exceptions are returned as messages to the client.# When True, exceptions are raised, which will cause the server to shut down# but also make tracing exceptions much easier during testing and when using# in-process servers.raise_exceptions: bool = False,
):

传输层的主要工作是初始化一个输入流和输出流,用于传输客户端请求命令及服务端的响应数据。

SseServerTransport 类为传输层的实现类,该类对客户端暴露了两个端点:

  • /sse
    • 作用:创建一个 SSE 通道,初始化输入输出流,创建 sessionId,传输服务端响应的数据(从 read_stream 读取数据),启动协议层的服务。
    • 状态:有状态。
  • /message
    • 作用:接收客户端的 POST 请求,将请求传递给 write_stream
    • 状态:无状态。
starlette_app = Starlette(debug=self.settings.debug,routes=[Route("/sse", endpoint=handle_sse),Mount("/messages/", app=sse.handle_post_message),],
)

初始化 SSE 连接的代码

@asynccontextmanager
async def connect_sse(self, scope: Scope, receive: Receive, send: Send):if scope["type"] != "http":logger.error("connect_sse received non-HTTP request")raise ValueError("connect_sse can only handle HTTP requests")logger.debug("Setting up SSE connection")read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]write_stream: MemoryObjectSendStream[types.JSONRPCMessage]write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]read_stream_writer, read_stream = anyio.create_memory_object_stream(0)write_stream, write_stream_reader = anyio.create_memory_object_stream(0)session_id = uuid4()session_uri = f"{quote(self._endpoint)}?session_id={session_id.hex}"self._read_stream_writers[session_id] = read_stream_writerlogger.debug(f"Created new session with ID: {session_id}")sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, Any]](0)async def sse_writer():logger.debug("Starting SSE writer")async with sse_stream_writer, write_stream_reader:await sse_stream_writer.send({"event": "endpoint", "data": session_uri})logger.debug(f"Sent endpoint event: {session_uri}")async for message in write_stream_reader:logger.debug(f"Sending message via SSE: {message}")await sse_stream_writer.send({"event": "message","data": message.model_dump_json(by_alias=True, exclude_none=True),})async with anyio.create_task_group() as tg:response = EventSourceResponse(content=sse_stream_reader, data_sender_callable=sse_writer)logger.debug("Starting SSE response task")tg.start_soon(response, scope, receive, send)logger.debug("Yielding read and write streams")yield (read_stream, write_stream)

以上代码中初始化了三个通道,其中 sse_stream_writersse_stream_reader 没有对外暴露。它们用于将数据传给 EventSourceResponse。三个通道之间的数据流向较为复杂,建议在阅读代码时结合流程图来分析,以便更好地理解数据的走向。注意 read_stream_writer 被添加进了一个字典中,这是因为 read_stream_writer 是在 handle_post_message 中传输用户请求的。由于 POST 请求是无状态的,为了找到要发送给哪个 read_stream_writer,必须通过 session_id 去查找。

对于不习惯协程编程范式的人来说,理解这几段代码可能还有些困难。有时间我会继续修改这篇文章,将整个流程说明白。

http://www.xdnf.cn/news/387163.html

相关文章:

  • 用ffmpeg压缩视频参数建议
  • 销售管理系统使用全攻略:从基础配置到数据分析
  • 嵌入式机器学习平台Edge Impulse图像分类 – 快速入门
  • VSCode连接Overleaf失败解决办法
  • Linux安装python3
  • HTML难点小记:一些简单标签的使用逻辑和实用化
  • Linux基础(查找/打包/压缩文件)
  • 基于 PostgreSQL 的 ABP vNext + ShardingCore 分库分表实战
  • 机器人手臂“听不懂“指令?Ethercat转PROFINET网关妙解通信僵局
  • 大数据时代的安全挑战——数据泄露如何悄然发生?
  • Kubernetes排错(十五):节点NotReady故障排查处理
  • MySQL基础面试题集锦
  • 【第三十五周】Janus-pro 技术报告阅读笔记
  • 实战项目4(05)
  • 《用MATLAB玩转游戏开发》Flappy Bird:小鸟飞行大战MATLAB趣味实现
  • C++内存管理详解
  • 互联网大厂Java求职面试实战:Spring Boot到微服务的技术问答解析
  • 《Redis应用实例》学习笔记,第二章:缓存二进制数据
  • “多端多接口多向传导”空战数据链体系——从异构融合架构到抗毁弹性网络的系统性设计
  • [工具]B站缓存工具箱 (By 郭逍遥)
  • MyBatis源码解读5(3.1、缓存简介)
  • 常见的排序算法(Java版)简单易懂好上手!!
  • path环境变量满了如何处理,分割 PATH 到 Path1 和 Path2
  • Java高频面试之并发编程-15
  • ES常识5:主分词器、子字段分词器
  • 嵌入式硬件篇---CAN
  • 【Mac 从 0 到 1 保姆级配置教程 12】- 安装配置万能的编辑器 VSCode 以及常用插件
  • Spring框架(2)---AOP
  • 鱼眼相机生成-BEV鸟瞰图-入门教程
  • Nginx yum 安装