文章目录 说明和参考资料 MCP 异地部署 补充:@click.option核心参数 流式HTTP MCP服务器开发和测试 项目开发 server文件关键代码解释 MCP服务器开启与测试 MCP客户端连接 流式HTTP MCP服务器发布(未实践)
说明和参考资料
说明:本文学习自赋范开源社区公开资料,结合本身实践总结而来,仅供学习和交流使用,最终著作权归九天老师及其团队所有! 参考文档:流式HTTP MCP服务器开发指南
MCP 异地部署
MCP采用客户端服务器架构,支持异地部署,SSE通信方式只支持SSE单向通信,无多通道并发,稳定新不足,不适用于企业场景。HTTP流式传输拥有更高的并发、更稳定的通信、更低难度的集成和部署成本。 目前SDK中已加入HTTP流式MCP服务器的相关功能支持(自SDK1.8.0版本开始支持streamable http)。开发者可以通过MCP SDK,高效快速开发HTTP SDK MCP服务器,并通过多通多并发的企业级MCP工具部署。 MCP python SDK streamable http guide docs
补充:@click.option核心参数
核心参数速查表
分类 参数 类型 说明 示例 基础控制 default
Any 未提供参数时的默认值 default=3000
type
Click Type 参数类型(自动转换) type=click.INT
/ type=click.Path(exists=True)
required
bool 是否必须提供参数(默认False
) required=True
help
str 帮助文本(支持中文) help="监听端口号"
交互增强 prompt
bool/str 未提供时交互式提示(可自定义提示文本) prompt="请输入API密钥"
confirmation_prompt
bool 需要二次确认(如密码) confirmation_prompt=True
hide_input
bool 隐藏输入内容(用于密码等敏感信息) hide_input=True
输入验证 callback
Callable 自定义验证函数 callback=validate_api_key
metavar
str 帮助信息中的参数占位符 metavar="PORT"
→ --port PORT
nargs
int 指定参数值的个数(如2
表示接收两个值) nargs=2
→ --file a.txt b.txt
特殊类型 is_flag
bool 作为布尔标志(无需值,存在即为True
) is_flag=True
→ --enable
multiple
bool 允许重复参数(收集为列表) multiple=True
→ --tag python --tag cli
count
bool 统计参数出现次数(如-vvv
) count=True
→ -v
=1, -vv
=2环境集成 envvar
str/list 从环境变量读取值(支持多个变量名) envvar="API_KEY"
或 envvar=["API_KEY", "TOKEN"]
show_envvar
bool 在帮助信息中显示支持的环境变量 show_envvar=True
→ [env var: API_KEY]
显示控制 show_default
bool/str 显示默认值(可自定义文本) show_default="默认3000"
hidden
bool 隐藏该选项(不在帮助信息显示) hidden=True
选择限制 choice
list/click.Choice
限定参数可选值 choice=["DEBUG", "INFO"]
或 type=click.Choice(["A", "B"])
常用组合示例
必填参数+环境变量
@click. option ( "--api-key" , envvar= "API_KEY" , required= True , help = "API密钥" )
布尔标志+默认值
@click. option ( "--verbose" , is_flag= True , default= False , help = "启用详细输出" )
多值参数+验证
@click. option ( "--files" , multiple= True , type = click. Path( exists= True ) )
交互式密码输入
@click. option ( "--password" , prompt= True , hide_input= True , confirmation_prompt= True )
表格中的参数均为常用配置,实际使用时可根据需求组合。
流式HTTP MCP服务器开发和测试
项目开发
安装uv工具conda install uv
创建项目,并创建、激活虚拟环境。cd / xxx/ code
uv init mcp- weather- http
cd mcp- weather- http
uv venv
source . venv/ bin / activate
在虚拟环境中,安装所需依赖。uv add mcp httpx
采用src_layer
的风格进行项目文件编排,因此需要删除main.py
,并创建mcp_weather_http
目录。rm main. py
mkdir - p . / src/ mcp_weather_http
cd . / src/ mcp_weather_http
在src/mcp_weather_http
文件中创建__init__.py
、__main__.py
、server.py
,具体文件内容如下:( mcp- weather- http) ( base) [ root@yang mcp_weather_http]
( mcp- weather- http) ( base) [ root@yang mcp_weather_http]
( mcp- weather- http) ( base) [ root@yang mcp_weather_http]
__init__.py
from . server import main
__main__.py
from mcp_weather_http import mainmain( )
server.py
import contextlib
import logging
from collections. abc import AsyncIteratorimport anyio
import click
import httpx
import mcp. types as types
from mcp. server. lowlevel import Server
from mcp. server. streamable_http_manager import StreamableHTTPSessionManager
from starlette. applications import Starlette
from starlette. routing import Mount
from starlette. types import Receive, Scope, Send
OPENWEATHER_URL = "https://api.openweathermap.org/data/2.5/weather"
DEFAULT_UNITS = "metric"
DEFAULT_LANG = "zh_cn" async def fetch_weather ( city: str , api_key: str ) - > dict [ str , str ] : """调用OpenWeather API并返回一个简化的天气字典。Raises:httpx.HTTPStatusError: if the response has a non-2xx status.""" params = { "q" : city, "appid" : api_key, "units" : DEFAULT_UNITS, "lang" : DEFAULT_LANG, } async with httpx. AsyncClient( timeout= 10 ) as client: r = await client. get( OPENWEATHER_URL, params= params) r. raise_for_status( ) data = r. json( ) weather_main = data[ "weather" ] [ 0 ] [ "main" ] description = data[ "weather" ] [ 0 ] [ "description" ] temp = data[ "main" ] [ "temp" ] feels_like = data[ "main" ] [ "feels_like" ] humidity = data[ "main" ] [ "humidity" ] return { "city" : city, "weather" : weather_main, "description" : description, "temp" : f" { temp} °C" , "feels_like" : f" { feels_like} °C" , "humidity" : f" { humidity} %" , } @click. command ( )
@click. option ( "--port" , default= 3000 , help = "HTTP服务监听的端口号" , type = int , show_default= True
)
@click. option ( "--api-key" , envvar= "OPENWEATHER_API_KEY" , required= True , help = "OpenWeather API密钥(也可以通过设置OPENWEATHER_API_KEY环境变量提供)" , metavar= "KEY"
)
@click. option ( "--log-level" , default= "INFO" , help = "日志级别 (DEBUG, INFO, WARNING, ERROR, CRITICAL)" , type = click. Choice( [ "DEBUG" , "INFO" , "WARNING" , "ERROR" , "CRITICAL" ] , case_sensitive= False )
)
@click. option ( "--json-response" , is_flag= True , default= False , help = "启用JSON响应格式(默认使用SSE流式传输)" , show_default= True
) def main ( port: int , api_key: str , log_level: str , json_response: bool ) - > int : """使用流式传输协议运行一个天气查询MCP服务""" logging. basicConfig( level= getattr ( logging, log_level. upper( ) ) , format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" , ) logger = logging. getLogger( "weather-server" ) app = Server( "mcp-streamable-http-weather" ) @app. call_tool ( ) async def call_tool ( name: str , arguments: dict ) - > list [ types. TextContent] : """Handle the 'get-weather' tool call.""" ctx = app. request_contextcity = arguments. get( "location" ) if not city: raise ValueError( "'location' is required in arguments" ) await ctx. session. send_log_message( level= "info" , data= f"Fetching weather for { city} …" , logger= "weather" , related_request_id= ctx. request_id, ) try : weather = await fetch_weather( city, api_key) except Exception as err: '''raise 重新抛出当前捕获的异常双重处理:实现「日志记录 + 协议层错误处理」的分离:本地处理:先通过send_log_message将错误详情流式传输给客户端全局处理:再通过raise让MCP协议层捕获并返回标准化错误响应''' await ctx. session. send_log_message( level= "error" , data= str ( err) , logger= "weather" , related_request_id= ctx. request_id, ) raise await ctx. session. send_log_message( level= "info" , data= "Weather data fetched successfully!" , logger= "weather" , related_request_id= ctx. request_id, ) summary = ( f" { weather[ 'city' ] } : { weather[ 'description' ] } ,温度 { weather[ 'temp' ] } ," f"体感 { weather[ 'feels_like' ] } ,湿度 { weather[ 'humidity' ] } 。" ) return [ types. TextContent( type = "text" , text= summary) , ] @app. list_tools ( ) async def list_tools ( ) - > list [ types. Tool] : """保留可用的工具给大模型""" return [ types. Tool( name= "get-weather" , description= "查询指定城市的实时天气(OpenWeather 数据)" , inputSchema= { "type" : "object" , "required" : [ "location" ] , "properties" : { "location" : { "type" : "string" , "description" : "城市的英文名称,如 'Beijing'" , } } , } , ) ] session_manager = StreamableHTTPSessionManager( app= app, event_store= None , json_response= json_response, stateless= True , ) async def handle_streamable_http ( scope: Scope, receive: Receive, send: Send) - > None : await session_manager. handle_request( scope, receive, send) @contextlib. asynccontextmanager async def lifespan ( app: Starlette) - > AsyncIterator[ None ] : async with session_manager. run( ) : logger. info( "Weather MCP server started! 🚀" ) try : yield finally : logger. info( "Weather MCP server shutting down…" ) starlette_app = Starlette( debug= False , routes= [ Mount( "/mcp" , app= handle_streamable_http) ] , lifespan= lifespan, ) import uvicornuvicorn. run( starlette_app, host= "0.0.0.0" , port= port ) return 0 if __name__ == "__main__" : main( )
查询当前环境中的setuotools
版本( mcp-weather-http) ( base) [ root@yang mcp_weather_http]
Name: setuptools
Version: 78.1 .1
回到项目主目录/mnt/code/mcp-weather-http
,修改 project.toml
[ build- system]
requires = [ "setuptools>=78.1.1" , "wheel" ]
build- backend = "setuptools.build_meta"
[ project]
name = "mcp-weather-http"
version = "0.1.0"
description = "输入OpenWeather-API-KEY,获取天气信息。"
readme = "README.md"
requires- python = ">=3.13"
dependencies = [ "httpx>=0.28.1" , "mcp>=1.13.1" ,
]
[ project. scripts]
mcp- weather- http = "mcp_weather_http:main"
[ tool. setuptools]
package- dir = { "" = "src" }
[ tool. setuptools. packages. find]
where = [ "src" ]
server文件关键代码解释
def main ( port: int , api_key: str , log_level: str , json_response: bool ) - > int : """使用流式传输协议运行一个天气查询MCP服务""" logging. basicConfig( level= getattr ( logging, log_level. upper( ) ) , format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" , ) logger = logging. getLogger( "weather-server" ) app = Server( "mcp-streamable-http-weather" ) @app. call_tool ( )
async def call_tool ( name: str , arguments: dict ) - > list [ types. TextContent] : """Handle the 'get-weather' tool call.""" ctx = app. request_contextcity = arguments. get( "location" ) if not city: raise ValueError( "'location' is required in arguments" ) await ctx. session. send_log_message( level= "info" , data= f"Fetching weather for { city} …" , logger= "weather" , related_request_id= ctx. request_id, ) try : weather = await fetch_weather( city, api_key) except Exception as err: '''raise 重新抛出当前捕获的异常双重处理:实现「日志记录 + 协议层错误处理」的分离:本地处理:先通过send_log_message将错误详情流式传输给客户端全局处理:再通过raise让MCP协议层捕获并返回标准化错误响应''' await ctx. session. send_log_message( level= "error" , data= str ( err) , logger= "weather" , related_request_id= ctx. request_id, ) raise await ctx. session. send_log_message( level= "info" , data= "Weather data fetched successfully!" , logger= "weather" , related_request_id= ctx. request_id, ) summary = ( f" { weather[ 'city' ] } : { weather[ 'description' ] } ,温度 { weather[ 'temp' ] } ," f"体感 { weather[ 'feels_like' ] } ,湿度 { weather[ 'humidity' ] } 。" ) return [ types. TextContent( type = "text" , text= summary) , ]
功能定位:这是一个工具调用(Tool Call)的异步处理函数 ,核心功能是: 作为大模型(如ChatGPT)的扩展工具,处理get-weather
天气查询请求 通过OpenWeather API获取指定城市的天气数据 实现实时日志流式传输 (Streaming Logs)和结构化返回结果
函数定义@app. call_tool ( )
async def call_tool ( name: str , arguments: dict ) - > list [ types. TextContent] :
@app.call_tool()
:装饰器声明这是一个工具调用端点异步设计 :async def
支持非阻塞IO(如网络请求)输入参数 : name
:工具名称(如"get-weather"
)arguments
:大模型传入的参数字典(如{"location": "北京"}
) 返回值 :标准化返回TextContent
类型列表(兼容多模态扩展)
参数验证city = arguments. get( "location" )
if not city: raise ValueError( "'location' is required in arguments" )
强制检查location
参数是否存在 缺失时抛出明确错误(大模型会捕获并提示用户)
实时日志流await ctx. session. send_log_message( level= "info" , data= f"Fetching weather for { city} …" , logger= "weather" , related_request_id= ctx. request_id,
)
实时反馈机制 :在API请求完成前,先发送"进行中"日志关键字段 : level
:日志级别(info/error等)related_request_id
:关联请求ID(用于链路追踪)
核心业务逻辑weather = await fetch_weather( city, api_key)
调用异步函数fetch_weather
(实际发起OpenWeather API请求)
异常处理await ctx. session. send_log_message( level= "error" , data= str ( err) , logger= "weather" , related_request_id= ctx. request_id,
)
raise
错误流式传输 :将异常信息实时发送给客户端。raise
的作用是重新抛出当前捕获的异常。双重处理:实现「日志记录 + 协议层错误处理」的分离: 本地处理:先通过send_log_message将错误详情流式传输给客户端。 全局处理:再通过raise让MCP协议层捕获并返回标准化错误响应。
失败
成功
调用 fetch_weather
是否成功?
捕获异常 Exception
发送错误日志到客户端
重新抛出异常 raise
MCP协议层处理
继续后续逻辑
场景 客户端表现 服务端行为 有raise 1. 实时看到错误日志2. 最终收到MCP错误响应 中断当前请求,MCP返回500状态码 无raise 仅看到错误日志,但状态码为200 继续执行后续代码(可能逻辑异常)
结果格式化summary = ( f" { weather[ 'city' ] } : { weather[ 'description' ] } ,温度 { weather[ 'temp' ] } ," f"体感 { weather[ 'feels_like' ] } ,湿度 { weather[ 'humidity' ] } 。" )
return [ types. TextContent( type = "text" , text= summary) ]
自然语言摘要 :将API返回的JSON转换为人类可读文本标准化返回 :包装为TextContent
类型(未来可扩展图片等内容)
用户 大模型 工具服务 客户端 OpenWeather 查询"北京天气" call_tool("get-weather", {"location":"北京"}) [日志流] "Fetching weather for 北京…" 异步API请求 返回JSON数据 [日志流] "Success!" ["北京:晴,温度 25℃,体感 26℃,湿度 60%"] 返回错误 [错误日志] "API请求失败" 抛出异常 alt [成功] [失败] 格式化响应 用户 大模型 工具服务 客户端 OpenWeather
@app. list_tools ( ) async def list_tools ( ) - > list [ types. Tool] : """保留可用的工具给大模型""" return [ types. Tool( name= "get-weather" , description= "查询指定城市的实时天气(OpenWeather 数据)" , inputSchema= { "type" : "object" , "required" : [ "location" ] , "properties" : { "location" : { "type" : "string" , "description" : "城市的英文名称,如 'Beijing'" , } } , } , ) ]
session_manager = StreamableHTTPSessionManager( app= app, event_store= None , json_response= json_response, stateless= True , ) async def handle_streamable_http ( scope: Scope, receive: Receive, send: Send) - > None : await session_manager. handle_request( scope, receive, send) @contextlib. asynccontextmanager async def lifespan ( app: Starlette) - > AsyncIterator[ None ] : async with session_manager. run( ) : logger. info( "Weather MCP server started! 🚀" ) try : yield finally : logger. info( "Weather MCP server shutting down…" )
这段代码实现了一个 ASGI(Asynchronous Server Gateway Interface)服务 的核心生命周期管理,分为两部分:HTTP请求处理和服务器启停管理。以下是详细解析:
HTTP请求处理 (handle_streamable_http
)async def handle_streamable_http ( scope: Scope, receive: Receive, send: Send) - > None : await session_manager. handle_request( scope, receive, send)
功能 : 作为ASGI协议的入口点 ,处理所有HTTP请求 将请求委托给session_manager
进行实际处理 参数说明 :
参数 类型 作用 scope
Scope
包含请求的元数据(如HTTP方法、路径、headers等)的字典 receive
Receive
异步函数,用于接收请求体(如POST数据) send
Send
异步函数,用于发送响应(如status/headers/body)
关键设计 : 委托模式 :将具体逻辑交给session_manager
实现解耦流式支持 :函数名streamable
暗示支持流式传输(如SSE/WebSocket)
生命周期管理 (lifespan
)
@contextlib. asynccontextmanager
async def lifespan ( app: Starlette) - > AsyncIterator[ None ] : async with session_manager. run( ) : logger. info( "Weather MCP server started! 🚀" ) try : yield finally : logger. info( "Weather MCP server shutting down…" )
功能 : 使用异步上下文管理器 管理服务启停 控制session_manager
的启动/清理逻辑
启动阶段 (async with session_manager.run()
) 初始化连接池、加载配置等 打印启动日志(含火箭emoji增强可读性 🚀) 运行阶段 (yield
) 保持服务运行状态 在此处可插入健康检查、指标上报等逻辑 关闭阶段 (finally
) 无论服务是否异常都会执行 释放资源(如关闭数据库连接)
机制 作用 @contextlib.asynccontextmanager
将普通函数转为异步上下文管理器 async with
确保session_manager.run()
的__aexit__
一定会被调用(类似Java的try-with-resources) yield
分隔启动和关闭逻辑(yield前为启动,后为关闭)
完整生命周期流程
ASGI_Server Lifespan SessionManager 服务启动 run() 初始化完成 进入运行状态(yield) handle_request() 返回响应 loop [处理请求] 服务终止 清理资源 关闭确认 ASGI_Server Lifespan SessionManager
这种模式是ASGI服务的标准实践,FastAPI/Starlette等框架均采用类似结构。
starlette_app = Starlette( debug= False , routes= [ Mount( "/mcp" , app= handle_streamable_http) ] , lifespan= lifespan, ) import uvicornuvicorn. run( starlette_app, host= "0.0.0.0" , port= port ) return 0
这段代码完成了 ASGI 应用的最终组装和服务器启动 ,是服务端程序的入口点。以下是解析:
Starlette 应用构造
starlette_app = Starlette( debug= False , routes= [ Mount( "/mcp" , app= handle_streamable_http) ] , lifespan= lifespan,
)
参数 作用 debug=False
关闭调试模式,避免敏感信息泄漏(如堆栈跟踪) routes
定义路由映射: - Mount("/mcp", ...)
将路径前缀/mcp
下的所有请求路由到handle_streamable_http
lifespan
绑定之前定义的异步生命周期管理器
路由设计意图: 通过/mcp
路径前缀实现API版本隔离 (如未来可扩展/mcp/v2
) 所有匹配/mcp/*
的请求都会交由handle_streamable_http
处理
Uvicorn 服务器启动uvicorn. run( starlette_app, host= "0.0.0.0" , port= port,
)
参数 典型值 作用 host
"0.0.0.0"
监听所有可用网络接口(如需限制只允许本地访问则设为"127.0.0.1"
) port
如3000
服务暴露的端口,通常通过外部参数传入 隐含配置 workers=1
默认单进程运行(适合配合K8s/Docker的横向扩展)
uvicorn. run( starlette_app, host= "0.0.0.0" , port= port, workers= 4 , timeout_keep_alive= 60 , access_log= False
)
返回状态码return 0
当服务器被手动停止(如Ctrl+C)时,返回0
表示正常退出 非零返回值通常表示错误(如端口冲突返回98
)
完整启动流程
User Uvicorn Starlette SessionManager OS 启动命令(含port参数) 初始化应用 调用lifespan启动 初始化完成 应用就绪 绑定端口 0.0.0.0:{port} 端口监听成功 服务运行中(阻塞主线程) HTTP请求 GET /mcp/weather 传递ASGI事件 handle_streamable_http() 生成响应 ASGI响应 返回HTTP响应 loop [请求处理] SIGTERM终止信号 触发lifespan关闭 清理资源 清理完成 释放端口 退出码 0 User Uvicorn Starlette SessionManager OS
MCP服务器开启与测试
开启流式HTTP MCP服务器
uv run . / src/ mcp_weather_http/ server. py - - api- key xxxx
开启Inspector:可以在本地主机上运行,连接公网或局域网内的MCP服务器。npx - y @modelcontextprotocol/ inspector
打开Inspector,网址在终端中显示 选择HTTP流式模式,选择运行地址:http://192.168.1.21:3000/mcp
,然后点击connect、点击List Tools、点击get-weather、输入地名进行测试
MCP客户端连接
安装cherry studio,配置模型和MCP服务。
服务器终端中可以查看到连接日志。 创建对话,选择模型和MCP服务器进行对话测试。
流式HTTP MCP服务器发布(未实践)
一站式解决 python打包代码,发布到pypi 测试完成后,即可上线发布。可以考虑发布到pypi平台。
uv pip install build twine
python -m build
python -m twine upload dist/*
本地安装pip install mcp-weather-http
开启服务uv run mcp- weather- http - - api- key YOUR_API_KEY
然后,即可使用Cherry studio
连接流式HTTP模式下的MCP服务器,还是和此前一样的连接流程,输入服务器名称mcp-weather-http,并选择流式传输类型,并选择服务器地址:http://localhost:3000/mcp
,然后点击保存。 若显示服务器更新成功,则表示已经连接上MCP服务器。