当前位置: 首页 > news >正文

MCP基础知识二(实战通信方式之Streamable HTTP)

介绍

MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。其中SSE被废弃了(Server-Sent Events (SSE) - Deprecated)

SSE as a standalone transport is deprecated as of protocol version 2024-11-05. It has been replaced by Streamable HTTP, which incorporates SSE as an optional streaming mechanism. For backwards compatibility information, see the backwards compatibility section below.

自协议版本 2024-11-05 起,SSE 作为独立传输已被弃用。它已被流式 HTTP 取代,后者将 SSE 作为可选的流式机制。有关向后兼容性信息,请参阅下方的向后兼容性部分。

MCP官网:Transports - Model Context Protocol

实战

1.编写一个py代码

from __future__ import annotations
import argparse
import asyncio
import json
from typing import Any, AsyncIterator
import httpx
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponse
# ---------------------------------------------------------------------------
# Server constants
# ---------------------------------------------------------------------------
SERVER_NAME = "WeatherServer"
SERVER_VERSION = "1.0.0"
PROTOCOL_VERSION = "2024-11-05" # Cherry Studio current
# ---------------------------------------------------------------------------
# Weather helpers
# ---------------------------------------------------------------------------
OPENWEATHER_URL = "https://api.openweathermap.org/data/2.5/weather"
API_KEY: str | None = None
USER_AGENT = "weather-app/1.0"
async def fetch_weather(city: str) -> dict[str, Any]:if not API_KEY:return {"error": "API_KEY 未设置,请提供有效的 OpenWeather API Key。"}params = {"q": city, "appid": API_KEY, "units": "metric", "lang": "zh_cn"}headers = {"User-Agent": USER_AGENT}async with httpx.AsyncClient(timeout=30.0) as client:try:r = await client.get(OPENWEATHER_URL, params=params, headers=headers)r.raise_for_status()return r.json()except httpx.HTTPStatusError as exc:return {"error": f"HTTP 错误: {exc.response.status_code} - {exc.response.text}"}except httpx.RequestError as exc:return {"error": f"网络请求失败: {exc}"}except json.JSONDecodeError:return {"error": "OpenWeather 返回的数据不是有效的 JSON"}def format_weather(data: dict[str, Any]) -> str:if "error" in data:return data["error"]city = data.get("name", "未知")country = data.get("sys", {}).get("country", "未知")temp = data.get("main", {}).get("temp", "N/A")humidity = data.get("main", {}).get("humidity", "N/A")wind = data.get("wind", {}).get("speed", "N/A")desc = data.get("weather", [{}])[0].get("description", "未知")return (f"🌍 {city}, {country}\n"f"🌡 温度: {temp}°C\n"f"💧 湿度: {humidity}%\n"f"🌬 风速: {wind} m/s\n"f"🌤 天气: {desc}")async def stream_weather(city: str, req_id: int | str) -> AsyncIterator[bytes]:# 进度提示(不会出错的部分放在try外面)yield json.dumps({"jsonrpc": "2.0","id": req_id,"stream": f"查询{city}天气中…"}).encode() + b"\n"try:await asyncio.sleep(0.3)data = await fetch_weather(city)  # 主要可能出错点if "error" in data:yield json.dumps({"jsonrpc": "2.0","id": req_id,"error": {"code": -32002,  # 保留业务错误码"message": data["error"]}}).encode() + b"\n"return# 正常结果(json.dumps理论上不会出错)yield json.dumps({"jsonrpc": "2.0","id": req_id,"result": {"content": [{"type": "text", "text": format_weather(data)}],"isError": False}}).encode() + b"\n"except Exception as exc:# 系统级错误单独处理yield json.dumps({"jsonrpc": "2.0","id": req_id,"error": {"code": -32003,  # 保留系统错误码"message": f"服务器内部错误: {str(exc)}"}}).encode() + b"\n"# ---------------------------------------------------------------------------# FastAPI app# ---------------------------------------------------------------------------
app = FastAPI(title="WeatherServer HTTP-Stream v8")
TOOLS_REGISTRY = {"tools": [{"name": "get_weather","description": "用于进行天气信息查询的函数,输入城市英文名称,即可获得当前城市天气信息。","inputSchema": {"type": "object","properties": {"city": {"type": "string","description": "City name, e.g. 'Hangzhou'"}},"required": ["city"]}}],"nextCursor": "qinqing"
}
@app.get("/mcp")
async def mcp_initialize_via_get():
# GET 请求也执行了 initialize 方法return {"jsonrpc": "2.0","id": 0,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True,"tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME,"version": SERVER_VERSION},"instructions": "Use the get_weather tool to fetch weather by cityname."}
}
@app.post("/mcp")
async def mcp_endpoint(request: Request):try:body = await request.json()# ✅ 打印客户端请求内容print("💡 收到请求:", json.dumps(body, ensure_ascii=False, indent=2))except Exception:return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700,"message": "Parse error"}}req_id = body.get("id", 1)method = body.get("method")# ✅ 打印当前方法类型print(f"🔧 方法: {method}")# 0) Ignore initialized notification (no response required)if method == "notifications/initialized":return Response(status_code=status.HTTP_204_NO_CONTENT)# 1) Activation probe (no method)if method is None:return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP serveronline."}}# 2) initializeif method == "initialize":return {"jsonrpc": "2.0","id": req_id,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True,"tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},"instructions": "Use the get_weather tool to fetch weather bycity name."}}# 3) tools/listif method == "tools/list":print(json.dumps(TOOLS_REGISTRY, indent=2, ensure_ascii=False))return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}# 4) tools/callif method == "tools/call":params = body.get("params", {})tool_name = params.get("name")args = params.get("arguments", {})if tool_name != "get_weather":return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602,"message": "Unknown tool"}}city = args.get("city")if not city:return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602,"message": "Missing city"}}# return StreamingResponse(stream_weather(city, req_id),media_type="application/json")data = await fetch_weather(city)return {"jsonrpc": "2.0","id": req_id,"result": {"content": [{"type": "text", "text": format_weather(data)}],"isError": False}}# 5) unknown methodreturn {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message":"Method not found"}}
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main() -> None:parser = argparse.ArgumentParser(description="Weather MCP HTTP-Stream v8")parser.add_argument("--api_key", required=False,default='xxxx')parser.add_argument("--host", default="127.0.0.1")parser.add_argument("--port", type=int, default=8000)args = parser.parse_args()global API_KEYAPI_KEY = args.api_keyimport uvicornuvicorn.run(app, host=args.host, port=args.port, log_level="info")
if __name__ == "__main__":# npx -y @modelcontextprotocol/inspector uv run weather2.pymain()

其中api_key 需要自己申请即可,可以免费调用大概1000次(测试和学习基本够用了) 

2. 使用mcp的检查器进行测试

npx -y @modelcontextprotocol/inspector uv run xxxx.py

参考官网:Inspector - Model Context Protocol

3. 将py进行启动运行

4. 检查器启动成功之后进行测试

简单的例子Streamable HTTP就成功了 ~

http://www.xdnf.cn/news/1122769.html

相关文章:

  • 微信131~140
  • 属性绑定
  • 零基础 “入坑” Java--- 十一、多态
  • IDEA中使用Servlet,tomcat输出中文乱码
  • 《星盘接口2:NVMe风暴》
  • [spring6: Resource ResourceLoader ResourceEditor]-加载资源
  • 【Java笔记】七大排序
  • 现有医疗AI记忆、规划与工具使用的创新路径分析
  • 融合竞争学习与高斯扰动的多目标加权平均算法(MOWAA)求解多无人机协同路径规划(多起点多终点,起始点、无人机数、障碍物可自定义),提供完整MATLAB代码
  • 嵌入式硬件篇---晶体管的分类
  • Transformer江湖录 第五章:江湖争锋 - BERT vs GPT
  • ZYNQ双核通信终极指南:FreeRTOS移植+OpenAMP双核通信+固化实战
  • CSS面试题
  • C++卸载了会影响电脑正常使用吗?解析C++运行库的作用与卸载后果
  • 后端接口通用返回格式与异常处理实现
  • UI前端大数据处理新挑战:如何高效处理实时数据流?
  • JavaScript学习第九章-第三部分(内建对象)
  • 内测分发平台应用的异地容灾和负载均衡处理和实现思路
  • 8.服务通信:Feign深度优化 - 解密声明式调用与现代负载均衡内核
  • 【微信小程序】
  • SQL ORM映射框架深度剖析:从原理到实战优化
  • springboot 好处
  • 【日常技能】excel的vlookup 匹配#N/A
  • 如何将 iPhone 备份到云端:完整指南
  • Mysql数据库学习--多表查询
  • spring-ai-alibaba官方 Playground 示例之联网搜索代码解析
  • 力扣 hot100 Day44
  • 判断端口处于监听状态的方法
  • day40 训练和测试的规范写法
  • 手滑误操作? vue + Element UI 封装二次确认框 | 附源码