当前位置: 首页 > news >正文

基于MCP架构的OpenWeather API服务端设计与实现

随着微服务和模块化架构的发展,越来越多的系统倾向于采用可插拔、高内聚的设计模式。MCP(Modular, Collaborative,Pluggable)架构正是这样一种强调模块化、协作性和扩展性的设计思想。它允许开发者以“组件”方式组合功能,提升系统的灵活性与可维护性。

本项目的目标是:
✅ 使用 OpenWeather API 实现天气数据获取
✅ 搭建一个基于 MCP 架构的天气查询服务器
✅ 通过stdio 方式实现客户端与服务器的交互
✅ 展示 MCP 协议下服务端与客户端的标准通信流程

1. Server搭建流程

这里尝试一个入门级的示例,那就是创建一个天气查询的服务器。通过使用OpenWeather API,创建一个能够实时查询天气的服务器(server),并使用stdio方式进行通信。​测试查询效果:

curl -s "https://api.openweathermap.org/data/2.5/weather?q=Beijing&appid='YOUR_API_KEY'&units=metric&lang=zh_cn"

在这里插入图片描述
成功通过测试后,即可开始创建server

2. 天气查询Server创建流程

2.1 Server依赖安装

为了通过 HTTP 请求查询天气数据,请在当前虚拟环境中添加以下依赖项:

uv add mcp httpx

代码编写 : MCP基本执行流程如下

在这里插入图片描述

# -*- coding: utf-8 -*-
"""
天气查询服务端(基于 MCP 架构)
功能:通过 OpenWeather API 获取指定城市的实时天气,并格式化返回给人类可读文本。
通信方式:使用 MCP 协议,通过标准输入输出(stdio)与客户端交互。
"""import json
import logging
import os
import httpx
from typing import Any, Dict, Optional
from mcp.server.fastmcp import FastMCP  # 导入 MCP 框架中的服务器核心类# ===========================================
# 1. 日志配置
# ===========================================# 配置基础日志系统,便于调试和运行时监控
logging.basicConfig(level=logging.INFO,  # 日志级别为 INFO,显示信息、警告和错误format="%(asctime)s [%(levelname)s] %(message)s",handlers=[logging.StreamHandler()  # 输出到控制台]
)
logger = logging.getLogger(__name__)  # 创建一个独立的日志记录器# ===========================================
# 2. 初始化 MCP 服务器
# ===========================================# 创建一个名为 "WeatherServer" 的 MCP 服务实例
# 该实例将注册工具函数(tool),供外部客户端调用
mcp = FastMCP("WeatherServer")# ===========================================
# 3. OpenWeather API 配置
# ===========================================# OpenWeather API 的基础 URL(获取当前天气数据)
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"# 从环境变量中读取 API Key,避免硬编码敏感信息
API_KEY = os.getenv("OPENWEATHER_API_KEY")
if not API_KEY:raise ValueError("OpenWeather API Key 未设置!\n""请运行以下命令设置环境变量:\n""export OPENWEATHER_API_KEY=your_actual_api_key")# 设置请求头中的 User-Agent,部分 API 会检查此字段
USER_AGENT = "weather-client/1.0"# ===========================================
# 4. 异步函数:从 OpenWeather API 获取天气数据
# ===========================================async def fetch_weather(city: str) -> Dict[str, Any]:"""向 OpenWeather API 发起异步 HTTP 请求,获取指定城市的天气信息。参数:city (str): 城市名称(英文,如 Beijing、Shanghai)返回:dict: 成功时返回天气数据字典;失败时返回包含 'error' 键的错误信息字典。"""# 构造请求参数params = {"q": city,           # 查询城市名"appid": API_KEY,    # 认证密钥"units": "metric",   # 使用摄氏度(公制单位)"lang": "zh_cn"      # 返回中文(简体)描述}# 设置请求头headers = {"User-Agent": USER_AGENT}# 使用 httpx 的异步客户端发起请求async with httpx.AsyncClient() as client:try:logger.info(f"正在请求 OpenWeather API,城市: {city}")response = await client.get(url=OPENWEATHER_API_BASE,params=params,headers=headers,timeout=30.0  # 设置 30 秒超时,防止请求挂起)response.raise_for_status()  # 如果状态码不是 2xx,抛出异常data = response.json()       # 解析 JSON 响应logger.info(f"成功获取天气数据: {data.get('name')}, {data.get('sys', {}).get('country')}")return data  # 返回原始天气数据except httpx.HTTPStatusError as e:# HTTP 状态码错误(如 404 城市不存在,401 密钥无效)status_code = e.response.status_codeerror_msg = f"HTTP 错误: {status_code}"logger.error(f"请求失败 [{status_code}] 查询城市: {city}")return {"error": error_msg}except httpx.RequestError as e:# 网络连接错误(如 DNS 失败、连接超时)logger.error(f"网络请求失败: {str(e)}")return {"error": f"网络错误: {str(e)}"}except Exception as e:# 其他未预期的异常(如 JSON 解析失败等)logger.error(f"未知异常: {str(e)}")return {"error": f"系统错误: {str(e)}"}# ===========================================
# 5. 函数:将天气数据格式化为易读的文本
# ===========================================def format_weather(data: Dict[str, Any]) -> str:"""将从 API 获取的天气数据字典转换为人类可读的格式化字符串。参数:data (dict): 包含天气信息的字典(来自 fetch_weather 的返回值)返回:str: 格式化后的天气信息,包含城市、温度、湿度、风速和天气状况。若输入包含 'error' 键,则返回错误提示。"""# 检查是否为错误响应if "error" in return f"⚠️ {data['error']}"# 从字典中安全提取各项数据,使用 .get() 提供默认值以防 KeyErrorcity = data.get("name", "未知城市")country = data.get("sys", {}).get("country", "未知国家")temp = data.get("main", {}).get("temp", "N/A")          # 温度humidity = data.get("main", {}).get("humidity", "N/A")  # 湿度wind_speed = data.get("wind", {}).get("speed", "N/A")   # 风速# 天气描述可能在 'weather' 列表的第一个元素中weather_list = data.get("weather", [{}])  # 默认为空列表,提供一个空字典description = weather_list[0].get("description", "未知天气")# 使用 emoji 增强可读性,组织成多行文本formatted = (f"🌍 {city}, {country}\n"f"🌡 温度: {temp}°C\n"f"💧 湿度: {humidity}%\n"f"🌬 风速: {wind_speed} m/s\n"f"🌤 天气: {description}\n")return formatted# ===========================================
# 6. MCP 工具函数:对外暴露的天气查询接口
# ===========================================@mcp.tool()
async def query_weather(city: str) -> str:"""MCP 工具函数:供客户端调用,查询指定城市的天气。此函数会被 MCP 框架自动注册,并通过 stdio 与客户端通信。参数:city (str): 要查询的城市名称(必须为英文,如 'Beijing')返回:str: 格式化后的天气信息文本,或错误提示。示例调用(由 MCP 客户端发起):{"tool": "query_weather", "arguments": {"city": "Beijing"}}"""logger.info(f"收到 MCP 客户端请求:查询城市天气 -> {city}")# 第一步:获取原始天气数据(异步)raw_data = await fetch_weather(city)# 第二步:格式化为人类可读文本result = format_weather(raw_data)# 记录结果(仅记录第一行,避免日志过长)logger.info(f"返回天气信息 -> {result.splitlines()[0]}")# 返回结果给客户端return result# ===========================================
# 7. 主程序入口:启动 MCP 服务器
# ===========================================if __name__ == "__main__":"""当前脚本作为主程序运行时,启动 MCP 服务器。服务器将通过标准输入/输出(stdio)与客户端通信。"""logger.info("🟩 MCP 天气查询服务器已启动...")logger.info("💡 等待客户端通过 stdio 发送请求...")# 启动服务器,监听标准输入输出# transport='stdio' 表示使用标准流进行通信(适用于 MCP 客户端集成)mcp.run(transport="stdio")logger.info("🛑 MCP 服务器已关闭。")

2.2 Client 创建流程

创建 MCP 客户端项目

# 创建项目目录
cd /root/autodl-tmp/MCP
uv init mcp-chatbot
cd mcp-chatbot

在这里插入图片描述
在这里插入图片描述
创建MCP客户端虚拟环境

# 创建虚拟环境
uv venv# 激活虚拟环境
source .venv/bin/activate

在这里插入图片描述
uv 会自动识别当前项目主目录并创建虚拟环境。​然后即可通过 add 方法在虚拟环境中安装相关的库。

# 安装 MCP SDK
uv add mcp openai python-dotenv httpx

在这里插入图片描述
接下来创建.env文件,并写入OpenAIAPI-Key,以及反向代理地址。借助反向代理,国内可以无门槛直连OpenAI官方服务器,并调用官方API
在这里插入图片描述
写入如下内容:

LLM_API_KEY="your-openai-api-key"
BASE_URL="https://your-reverse-proxy-url.com/v1"
MODEL=gpt-4o

在这里插入图片描述
创建servers_config.json:

在这里插入图片描述

创建weather_server.py:

在这里插入图片描述

import json
import logging
import os
import httpx
from typing import Any, Dict
from mcp.server.fastmcp import FastMCP# ===========================================
# 日志配置
# ===========================================
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# ===========================================
# 初始化 MCP 服务器
# ===========================================
mcp = FastMCP("WeatherServer")# ===========================================
# OpenWeather API 配置
# ===========================================
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = os.getenv("OPENWEATHER_API_KEY")
if not API_KEY:raise ValueError("请设置环境变量 OPENWEATHER_API_KEY")USER_AGENT = "weather-app/1.0"# ===========================================
# 异步函数:从 OpenWeather API 获取天气数据
# ===========================================
async def fetch_weather(city: str) -> Dict[str, Any]:"""从 OpenWeather API 获取指定城市的天气信息。:param city: 城市名称(英文):return: 包含天气数据或错误信息的字典"""params = {"q": city,"appid": API_KEY,"units": "metric","lang": "zh_cn",}headers = {"User-Agent": USER_AGENT}async with httpx.AsyncClient() as client:try:response = await client.get(OPENWEATHER_API_BASE,params=params,headers=headers,timeout=30.0)response.raise_for_status()return response.json()except httpx.HTTPStatusError as e:status = e.response.status_codelogger.error(f"HTTP 错误 [{status}] 查询城市: {city}")return {"error": f"HTTP 错误: {status}"}except Exception as e:logger.error(f"请求失败: {str(e)}")return {"error": f"请求失败: {str(e)}"}# ===========================================
# 函数:格式化天气数据为人类可读文本
# 支持传入 dict 或 JSON 字符串
# ===========================================
def format_weather(data: Dict[str, Any] | str) -> str:"""将天气数据格式化为易读的文本。支持输入为字典或 JSON 字符串。:param data: 天气数据(dict 或 JSON 字符串):return: 格式化后的天气信息字符串"""# 如果传入的是字符串,尝试解析为字典if isinstance(data, str):try:data = json.loads(data)except Exception as e:return f"无法解析天气数据: {e}"# 如果数据中包含错误信息,直接返回错误提示if "error" inreturn f"⚠️ {data['error']}"# 提取数据时做容错处理,防止 KeyErrorcity = data.get("name", "未知")country = data.get("sys", {}).get("country", "未知")temp = data.get("main", {}).get("temp", "N/A")humidity = data.get("main", {}).get("humidity", "N/A")wind_speed = data.get("wind", {}).get("speed", "N/A")# weather 字段可能为空列表,提供默认值避免索引错误weather_list = data.get("weather", [{}])description = weather_list[0].get("description", "未知")# 使用 emoji 增强可读性,组织成多行输出return (f"🌍 {city}, {country}\n"f"🌡 温度: {temp}°C\n"f"💧 湿度: {humidity}%\n"f"🌬 风速: {wind_speed} m/s\n"f"🌤 天气: {description}\n")# ===========================================
# MCP 工具函数:查询天气
# ===========================================
@mcp.tool()
async def query_weather(city: str) -> str:"""输入指定城市的英文名称,返回今日天气查询结果。:param city: 城市名称(需使用英文,如 Beijing):return: 格式化后的天气信息"""logger.info(f"正在查询城市天气: {city}")data = await fetch_weather(city)result = format_weather(data)return result# ===========================================
# 主程序入口
# ===========================================
if __name__ == "__main__":"""启动 MCP 服务器,通过标准输入输出(stdio)与客户端通信。"""logger.info("🟩 天气查询服务器已启动,等待客户端请求...")mcp.run(transport="stdio")logger.info("🛑 服务器已关闭。")

然后在config.json中写入如下内容:

{"mcpServers": {"weather": {"command": "python","args": ["weather_server.py"]}}
}

创建main.py:

# -*- coding: utf-8 -*-
"""
main.py - 基于 MCP 协议的多服务器 AI 客户端主程序
实现功能:加载配置、连接多个 MCP 服务器、集成 OpenAI Function Calling,支持大模型动态调用外部工具(如天气查询、数据库操作等)。
"""# ===========================================
# 一、导入所需库
# ===========================================import asyncio
import json
import logging
import os
import shutil
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optionalimport httpx
from dotenv import load_dotenv
from openai import OpenAI  # OpenAI Python SDK,用于与 OpenAI 兼容 API 交互
from mcp import ClientSession, StdioServerParameters  # MCP 协议核心库
from mcp.client.stdio import stdio_client  # 通过 stdio 与 MCP 服务器通信"""
导入的库说明:- asyncio: Python 中的异步编程库,用于处理异步任务(如并发连接多个服务器)。
- json: 用于序列化和反序列化 JSON 格式数据,读取配置文件。
- logging: 配置日志输出,便于调试和运行时监控。
- os: 与操作系统交互,读取环境变量(如 API Key)。
- shutil: 提供文件和目录的高层操作接口(当前未使用,但可用于未来扩展)。
- contextlib: 提供异步上下文管理器 AsyncExitStack,用于资源自动清理。
- httpx: 异步 HTTP 客户端库,可用于未来扩展(如健康检查、Web 请求)。
- dotenv: 从 .env 文件加载环境变量,避免硬编码敏感信息。
- openai: OpenAI Python SDK,用于调用大模型 API(支持 Function Calling)。
- mcp: MCP(Model Context Protocol)协议客户端库,用于与本地工具服务器通信。
"""# ===========================================
# 二、配置加载类 (Configuration)
# ===========================================class Configuration:"""功能: 管理 MCP 客户端的环境变量和配置文件。方法:__init__: 从 .env 文件加载环境变量,获取 LLM_API_KEY、BASE_URL 和 MODEL。load_config: 从指定路径加载 JSON 配置文件,返回配置字典。"""def __init__(self) -> None:"""初始化配置类,加载 .env 文件并读取关键环境变量。"""load_dotenv()  # 加载 .env 文件中的环境变量# 从环境变量中读取 LLM 配置self.api_key = os.getenv("LLM_API_KEY")self.base_url = os.getenv("BASE_URL")self.model = os.getenv("MODEL")# 若未设置 API Key,抛出异常if not self.api_key:raise ValueError("❌ 未找到 LLM_API_KEY,请在 .env 文件中配置")@staticmethoddef load_config(file_path: str) -> Dict[str, Any]:"""从 JSON 文件加载服务器配置。Args:file_path (str): JSON 配置文件路径(如 "servers_config.json")Returns:Dict[str, Any]: 解析后的服务器配置字典Raises:FileNotFoundError: 若文件不存在json.JSONDecodeError: 若 JSON 格式错误"""with open(file_path, "r", encoding="utf-8") as f:return json.load(f)# ===========================================
# 三、MCP 服务器客户端类 (Server)
# ===========================================class Server:"""功能: 管理单个 MCP 服务器的连接、工具调用与资源清理。方法:initialize: 初始化与 MCP 服务器的连接,使用 stdio_client 建立通信。list_tools: 获取该服务器支持的所有工具列表。execute_tool: 执行指定工具,支持重试机制。cleanup: 清理资源,关闭与服务器的连接。"""def __init__(self, name: str, config: Dict[str, Any]) -> None:self.name: str = name  # 服务器名称(如 "weather")self.config: Dict[str, Any] = config  # 服务器配置(command, args 等)self.session: Optional[ClientSession] = None  # MCP 会话对象self.exit_stack: AsyncExitStack = AsyncExitStack()  # 资源管理器self._cleanup_lock = asyncio.Lock()  # 防止并发清理async def initialize(self) -> None:"""初始化与 MCP 服务器的连接。使用 stdio_client 启动服务器进程并通过标准输入输出通信。"""command = self.config["command"]if command is None:raise ValueError("command 不能为空")# 构建服务器启动参数server_params = StdioServerParameters(command=command,args=self.config["args"],env={**os.environ, **self.config["env"]} if self.config.get("env") else None,)try:# 建立 stdio 通信通道stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read_stream, write_stream = stdio_transport# 创建 MCP 会话session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))await session.initialize()  # 完成握手协议self.session = sessionlogging.info(f"✅ 成功连接到 MCP 服务器: {self.name}")except Exception as e:logging.error(f"❌ 初始化服务器失败 {self.name}: {e}")await self.cleanup()raiseasync def list_tools(self) -> List[Any]:"""获取该 MCP 服务器支持的工具列表。Returns:List[Any]: 工具对象列表,包含名称、描述和输入 schema。"""if not self.session:raise RuntimeError(f"Server {self.name} not initialized")tools_response = await self.session.list_tools()tools = []for item in tools_response:if isinstance(item, tuple) and item[0] == "tools":for tool in item[1]:tools.append(Tool(tool.name, tool.description, tool.inputSchema))return toolsasync def execute_tool(self,tool_name: str,arguments: Dict[str, Any],retries: int = 2,delay: float = 1.0) -> Any:"""执行指定工具,并支持重试机制。Args:tool_name (str): 工具名称arguments (Dict[str, Any]): 工具调用参数retries (int): 最大重试次数delay (float): 每次重试之间的延迟(秒)Returns:Any: 工具调用结果"""if not self.session:raise RuntimeError(f"Server {self.name} not initialized")attempt = 0while attempt < retries:try:logging.info(f"🔁 执行工具: {tool_name} (服务器: {self.name})")result = await self.session.call_tool(tool_name, arguments)return resultexcept Exception as e:attempt += 1logging.warning(f"⚠️ 工具执行失败: {e} (第 {attempt} 次尝试)")if attempt < retries:await asyncio.sleep(delay)else:logging.error("❌ 已达到最大重试次数,调用失败。")raiseasync def cleanup(self) -> None:"""清理服务器资源,关闭通信通道。"""async with self._cleanup_lock:try:await self.exit_stack.aclose()self.session = Nonelogging.info(f"🔌 已断开服务器: {self.name}")except Exception as e:logging.error(f"❌ 清理服务器 {self.name} 时出错: {e}")# ===========================================
# 四、工具封装类 (Tool)
# ===========================================class Tool:"""功能: 封装从 MCP 服务器获取的工具信息,便于传递给 LLM。方法:format_for_llm: 将工具信息格式化为适合 LLM 理解的文本描述。"""def __init__(self, name: str, description: str, input_schema: Dict[str, Any]) -> None:self.name: str = nameself.description: str = descriptionself.input_schema: Dict[str, Any] = input_schemadef format_for_llm(self) -> str:"""生成用于 LLM 提示的工具描述文本。Returns:str: 包含工具名、描述、参数及是否必填的格式化字符串。"""args_desc = []if "properties" in self.input_schema:for param_name, param_info in self.input_schema["properties"].items():arg_desc = f"- {param_name}: {param_info.get('description', 'No description')}"if param_name in self.input_schema.get("required", []):arg_desc += " (required)"args_desc.append(arg_desc)return (f"Tool: {self.name}\n"f"Description: {self.description}\n"f"Arguments:\n"f"{chr(10).join(args_desc)}")# ===========================================
# 五、LLM 客户端封装类 (LLMClient)
# ===========================================class LLMClient:"""功能: 使用 OpenAI SDK 与大语言模型进行交互,支持 Function Calling。方法:get_response: 向大模型发送消息,并可传入工具定义(Function Calling 格式)。"""def __init__(self, api_key: str, base_url: Optional[str], model: str) -> None:self.client = OpenAI(api_key=api_key, base_url=base_url)self.model = modeldef get_response(self,messages: List[Dict[str, Any]],tools: Optional[List[Dict[str, Any]]] = None) -> Any:"""发送消息给大模型 API,支持传入工具参数(即 Function Calling 格式)。Args:messages (List[Dict]): 对话消息列表tools (List[Dict]): 工具定义列表(可选)Returns:Any: API 返回的响应对象"""payload = {"model": self.model,"messages": messages,"tools": tools,}try:response = self.client.chat.completions.create(**payload)return responseexcept Exception as e:logging.error(f"❌ 调用 LLM 失败: {e}")raise# ===========================================
# 六、多服务器 MCP 客户端类 (MultiServerMCPClient)
# ===========================================class MultiServerMCPClient:"""功能: 管理多个 MCP 服务器,并使用 OpenAI Function Calling 机制与大模型交互。方法:connect_to_servers: 根据配置文件启动多个服务器,获取并注册工具。transform_json: 将 MCP 工具 schema 转换为 OpenAI 所需格式。chat_base: 核心对话逻辑,支持多轮工具调用。create_function_response_messages: 执行工具调用并将结果注入对话。process_query: 处理用户查询,返回最终 AI 回答。_call_mcp_tool: 根据 server_tool 格式调用对应 MCP 工具。chat_loop: 主交互循环,接收用户输入并输出 AI 回答。cleanup: 关闭所有服务器连接和资源。"""def __init__(self) -> None:self.exit_stack = AsyncExitStack()config = Configuration()self.openai_api_key = config.api_keyself.base_url = config.base_urlself.model = config.modelself.client = LLMClient(self.openai_api_key, self.base_url, self.model)self.servers: Dict[str, Server] = {}  # server_name -> Server 实例self.tools_by_server: Dict[str, List[Any]] = {}  # server -> toolsself.all_tools: List[Dict[str, Any]] = []  # 所有工具(OpenAI 格式)async def connect_to_servers(self, servers_config: Dict[str, Any]) -> None:"""根据配置文件同时启动多个 MCP 服务器并获取其工具列表。Args:servers_config (Dict): 包含 mcpServers 的配置字典"""mcp_servers = servers_config.get("mcpServers", {})for server_name, srv_config in mcp_servers.items():server = Server(server_name, srv_config)await server.initialize()self.servers[server_name] = servertools = await server.list_tools()self.tools_by_server[server_name] = toolsfor tool in tools:function_name = f"{server_name}_{tool.name}"self.all_tools.append({"type": "function","function": {"name": function_name,"description": tool.description,"input_schema": tool.input_schema}})# 转换为 OpenAI Function Calling 所需格式self.all_tools = await self.transform_json(self.all_tools)logging.info("\n✅ 已连接到下列服务器:")for name in self.servers:srv_cfg = mcp_servers[name]logging.info(f"  - {name}: command={srv_cfg['command']}, args={srv_cfg['args']}")logging.info("\n汇总的工具:")for t in self.all_tools:logging.info(f"  - {t['function']['name']}")async def transform_json(self, json_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""将工具的 input_schema 转换为 OpenAI 所需的 parameters 格式。Args:json_data (List[Dict]): 原始工具列表Returns:List[Dict]: 转换后的 OpenAI 兼容格式"""result = []for item in json_data:if not isinstance(item, dict) or "type" not in item or "function" not in item:continueold_func = item["function"]if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:continuenew_func = {"name": old_func["name"],"description": old_func["description"],"parameters": {}}if "input_schema" in old_func and isinstance(old_func["input_schema"], dict):old_schema = old_func["input_schema"]new_func["parameters"]["type"] = old_schema.get("type", "object")new_func["parameters"]["properties"] = old_schema.get("properties", {})new_func["parameters"]["required"] = old_schema.get("required", [])new_item = {"type": item["type"],"function": new_func}result.append(new_item)return resultasync def chat_base(self, messages: List[Dict[str, Any]]) -> Any:"""使用 OpenAI 接口进行对话,支持多次工具调用。Args:messages (List[Dict]): 当前对话上下文Returns:Any: 模型最终响应"""response = self.client.get_response(messages, tools=self.all_tools)if response.choices[0].finish_reason == "tool_calls":while True:messages = await self.create_function_response_messages(messages, response)response = self.client.get_response(messages, tools=self.all_tools)if response.choices[0].finish_reason != "tool_calls":breakreturn responseasync def create_function_response_messages(self,messages: List[Dict[str, Any]],response: Any) -> List[Dict[str, Any]]:"""解析模型返回的工具调用,执行并返回结果。Args:messages: 原始消息列表response: 模型返回的包含 tool_calls 的响应Returns:更新后的消息列表,包含工具调用结果"""function_call_messages = response.choices[0].message.tool_callsmessages.append(response.choices[0].message.model_dump())for function_call_message in function_call_messages:tool_name = function_call_message.function.nametool_args = json.loads(function_call_message.function.arguments)function_response = await self._call_mcp_tool(tool_name, tool_args)messages.append({"role": "tool","content": function_response,"tool_call_id": function_call_message.id,})return messagesasync def process_query(self, user_query: str) -> str:"""处理用户查询,支持模型调用工具并返回最终回答。Args:user_query (str): 用户输入的问题Returns:str: AI 的最终回答"""messages = [{"role": "user", "content": user_query}]response = self.client.get_response(messages, tools=self.all_tools)content = response.choices[0]if content.finish_reason == "tool_calls":tool_call = content.message.tool_calls[0]tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)logging.info(f"\n[ 调用工具: {tool_name}, 参数: {tool_args} ]\n")result = await self._call_mcp_tool(tool_name, tool_args)messages.append(content.message.model_dump())messages.append({"role": "tool","content": result,"tool_call_id": tool_call.id,})response = self.client.get_response(messages, tools=self.all_tools)return response.choices[0].message.contentreturn content.message.contentasync def _call_mcp_tool(self, tool_full_name: str, tool_args: Dict[str, Any]) -> str:"""调用 MCP 工具,支持 server_tool 格式解析。Args:tool_full_name: 如 "weather_query_weather"tool_args: 工具参数Returns:str: 工具执行结果"""parts = tool_full_name.split("_", 1)if len(parts) != 2:return f"❌ 无效的工具名称: {tool_full_name}"server_name, tool_name = partsserver = self.servers.get(server_name)if not server:return f"❌ 找不到服务器: {server_name}"resp = await server.execute_tool(tool_name, tool_args)return resp.content if resp.content else "工具执行无输出"async def chat_loop(self) -> None:"""主聊天循环,接收用户输入并输出 AI 回答。输入 'quit' 可退出。"""logging.info("\n🤖 多服务器 MCP + Function Calling 客户端已启动!输入 'quit' 退出。")messages: List[Dict[str, Any]] = []while True:query = input("\n你: ").strip()if query.lower() == "quit":breaktry:messages.append({"role": "user", "content": query})messages = messages[-20:]  # 保留最近 20 条response = await self.chat_base(messages)messages.append(response.choices[0].message.model_dump())result = response.choices[0].message.contentprint(f"\nAI: {result}")except Exception as e:print(f"\n⚠️ 调用过程出错: {e}")async def cleanup(self) -> None:"""关闭所有资源。"""await self.exit_stack.aclose()# ===========================================
# 七、主函数 (main)
# ===========================================async def main() -> None:"""主程序入口点。流程:1. 加载 .env 环境变量和 JSON 配置文件。2. 连接所有 MCP 服务器并获取工具。3. 启动交互式聊天循环。4. 程序结束时清理资源。异常处理:- 捕获配置文件错误、连接失败、调用异常等。- 输出详细日志,确保程序优雅退出。"""config = Configuration()servers_config = config.load_config("servers_config.json")client = MultiServerMCPClient()try:await client.connect_to_servers(servers_config)await client.chat_loop()finally:try:await asyncio.sleep(0.1)await client.cleanup()except RuntimeError as e:if "Attempted to exit cancel scope" in str(e):logging.info("退出时检测到 cancel scope 异常,已忽略。")else:raise# ===========================================
# 八、程序启动入口
# ===========================================if __name__ == "__main__":"""程序启动入口,运行主异步函数。"""asyncio.run(main())

3. 运行测试

# 当前项目的主目录下输入uv run进行运行
uv run main.py

在这里插入图片描述
可以进行多轮对话并进行天气查询:

在这里插入图片描述
并支持多工具并行调用:

在这里插入图片描述
在这里插入图片描述

4. 拓展工具集成

在当前主目录下创建 write_server.py 服务器:

在这里插入图片描述

# write_server.py
from typing import Any
from mcp.server.fastmcp import FastMCP
import os# 初始化 MCP 服务器
mcp = FastMCP("WriteServer")@mcp.tool()
async def write_file(content: str) -> str:"""将内容写入 output.txt 文件。"""try:# 确保目录存在directory = os.path.dirname("output.txt")if directory:os.makedirs(directory, exist_ok=True)# 写入文件with open("output.txt", "w", encoding="utf-8") as f:f.write(content)return "✅ 已成功写入本地文件。"except PermissionError:return "❌ 无写入权限。"except Exception as e:return f"❌ 写入失败: {str(e)}"if __name__ == "__main__":mcp.run(transport="stdio")

同时写入配置文件:

在这里插入图片描述

{"mcpServers": {"weather": {"command": "python","args": ["weather_server.py"]},"write": {"command": "python","args": ["write_server.py"]}}
}

重启对话:

在这里插入图片描述

5. MPC服务器在线管理与实时下载

5.1 npm registry 简介介绍

npm registry(Node Package Manager Registry)是一个 开源的 JavaScript 包管理平台,它存储着成千上万的 JavaScriptNode.js 库、工具和框架。开发者可以将自己的代码库作为包发布到 npm registry,供其他开发者使用。它是 npm (Node Package Manager)工具的核心组件,npm 是当前最流行的 JavaScript 包管理工具,广泛应用于前端和后端开发中。

npm registry 的作用是为 JavaScript/Node.js 开发者 提供一个集中的资源库,用户可以通过 npmnpx 等工具来安装、更新和使用这些包。除此之外,npm registry 还支持其他语言的工具和脚本,比如通过 uvx,Python 工具也能方便地通过 npm registry 进行下载和管理。

优点:

  1. 无需手动下载和安装依赖

    通过 npmnpx,开发者可以轻松地 实时下载并运行 所需的包,无需手动下载、解压和安装依赖项。npx 甚至支持临时下载并执行工具,而不必安装到本地环境中,减少了不必要的手动操作。

  2. 集中管理和共享

    npm registry 提供了一个集中管理和分发代码的场所,开发者可以方便地发布自己的工具、库,并与全球其他开发者共享。这促进了 开源生态系统 的发展,并且让其他开发者能够轻松使用这些工具。

  3. 跨语言支持(通过 uvx)

    npm registry 是以 JavaScript/Node.js 为主,但通过 uvx 等工具,它也可以方便地管理 Python 包 和其他语言的工具,这使得 跨语言开发 更加简洁和高效。

  4. 简化依赖管理和版本控制

    在开发过程中,npm registry 不仅能帮助开发者快速获取第三方库,还能自动处理依赖版本的管理。通过 npm 配置文件(如 package.json),开发者可以清晰地查看和管理项目所依赖的所有库,并且可以随时更新、安装或回滚特定版本。

  5. 跨平台支持

    npm registry 支持的工具和包广泛适用于不同操作系统(如 Windows、macOS、Linux 等)。npm registry 提供了一个集中、开放、实时更新的生态系统,极大地简化了开发者在项目中使用外部工具和库的过程。开发者只需要通过简单的命令(如 npm installnpx),就能实时下载最新版本的库、工具和框架,而无需处理繁琐的版本管理和依赖配置。实时下载和运行工具包的便捷性,使得开发工作更加高效,能够快速迭代和创新,同时促进了开源社区的蓬勃发展。

5.2 将开发好的库上传至npm registry

接下来我们尝试将一个 Python 编写的 MCP 服务器 发布为一个 npm 包,并能够通过 npxuvx 快速运行该服务器。这种方法使得您可以跨平台发布和使用 Python 脚本,而不需要其他开发者手动安装和配置 Python 环境。

(1) 准备 Python 代码

编写一个Python脚本,也就是一个MCP服务器。以查询天气为例!

(2) 创建一个 Node.js 项目

初始化 Node.js 项目:首先,我们需要一个 package.json 文件,这是 npm 包的核心配置文件。我们可以通过 npm init 命令来初始化一个新的 Node.js 项目。

# 打开终端,进入到项目文件夹,然后运行以下命令
npm init

创建一个新的 package.json 文件。在提问时,可以按默认值按下 Enter,或者输入自定义内容。

在这里插入图片描述
安装 uvx 工具

npm install uvx --save

(3) 配置 package.json 来运行 Python 脚本

  • 在 package.json 文件中,添加一个 bin 字段,告诉 npm 包如何启动我们的 Python 脚本。

  • 打开 package.json 文件,并将其修改为类似下面的样子:

    {"name": "weather-server","version": "1.0.0","description": "A weather server that fetches weather data from OpenWeather API","main": "index.js","bin": {"mcp-server-git": "./weather_server.py"},"dependencies": {"uvx": "^latest"},"scripts": {"start": "uvx weather-server"},"author": "","license": "ISC"
    }
    
  • bin 字段:将我们的 Python 脚本路径指定为命令。这里,"mcp-server-git" 将成为用户运行命令时执行的脚本名称,"./weather_server.py" 指定 Python 脚本路径。

  • scripts 字段:指定使用 uvx 启动 Python 脚本。

    在这里插入图片描述

    在项目根目录下创建一个简单的 index.js 文件来调用 Python 脚本

    # index.js 文件
    const { exec } = require('child_process');exec('python weather_server.py --api_key YOUR_API_KEY', (error, stdout, stderr) => {if (error) {console.error(`exec error: ${error}`);return;}console.log(`stdout: ${stdout}`);console.error(`stderr: ${stderr}`);
    });
    

    这个脚本将运行我们的 Python 脚本并传递 API Key

    在这里插入图片描述

(4) 创建一个 .npmignore 文件

如果项目包含不需要发布到 npm 的文件(如 Python 环境相关的文件、缓存文件等),可以在项目根目录创建一个 .npmignore 文件,并列出这些文件。

*.pyc
__pycache__
*.env

(5) 发布包到 npm

  • 登录您的 npm账号

    在这里插入图片描述

    npm login
    

    在这里插入图片描述
    注意这里需要访问 npm 官方网站:https://www.npmjs.com/signup进行注册,并且设置npm为官方镜像源:

    npm config set registry https://registry.npmjs.org/
    

    然后才能顺利的登录和发布,发布到 npm:使用以下命令将您的包发布到 npm registry

    npm publish
    

    这将把包上传到 npm registry,其他用户就可以通过 npxuvx 下载并运行您的 Python 服务器了。

    在这里插入图片描述

(6) 使用 npx 或 uvx 来运行 MCP 服务器

发布成功后,尝试在Cherry Studio中运行这个天气查询服务器。

6. 总结

  1. MCP 服务端构建
  • 使用 mcp.server.fastmcp 搭建了基于 FastMCP 的天气查询服务器;
  • 通过 httpx 异步调用 OpenWeather API,获取实时天气数据;
  • 实现了结构化错误处理与中文格式化输出,提升可读性。
  1. 客户端集成与交互
  • 基于 MultiServerMCPClient 构建了支持多服务器管理的客户端;
  • 实现了 OpenAI Function Calling 风格的工具调用机制;
  • 支持多轮对话、上下文维护与异常重试,具备生产级稳定性。
  1. 多工具扩展能力
  • 成功集成 write_file 工具,实现“查询天气 + 写入文件”复合任务;
  • config.json 中统一管理多个 MCP 服务器,体现模块化设计优势;
  • 展示了 AI Agent 调用多个外部工具完成复杂指令的能力。
  1. 工具标准化与分发
  • 将 Python 编写的 MCP 服务器封装为 npm 包;
  • 利用 uvxnpx 实现跨语言运行,无需手动安装依赖;
  • 成功发布至 npm registry,实现“一键下载、即用即走”的工具分发模式。
  1. 平台集成验证
  • Cherry Studio 中成功加载并运行 MCP 服务器;
  • 验证了 AI 可自动识别工具、发起调用并整合结果;
  • 构建了“用户提问 → AI 决策 → 工具执行 → 返回结果”的完整闭环。

7. 项目价值与展望

方向当前成果未来展望
模块化架构MCP 实现功能解耦支持热插拔、动态加载
AI 工具生态支持自定义工具建立开源 MCP 工具市场
跨平台分发npm + uvx 实现跨语言运行推动 MCP 成为标准协议
本地化部署完全本地运行,保护隐私支持边缘计算与离线模式

8. 结语

本项目不仅验证了 MCP 架构在 AI Agent 中的强大扩展性,更探索了一条 “本地服务 → 标准协议 → 全球共享” 的工具开发与分发路径。未来,随着更多开发者加入 MCP 生态,我们将迎来一个真正开放、协作、智能化的 AI 工具时代。

http://www.xdnf.cn/news/1420327.html

相关文章:

  • C#在物联网GPS经纬度转换为百度地图地址
  • 亚马逊云代理商:如何选择适合的AWS EC2实例类型?
  • CVE Push Service | 高危漏洞实时情报自动化推送工具
  • Vue基础知识-使用监视属性watch和计算属性computed实现列表过滤+排序
  • 【golang长途旅行第35站】Redis
  • docker中的命令(六)
  • 针对redis中的热数据该怎么处理
  • ✝常用表格✝
  • Simulink库文件-一种低通滤波模块搭建方法
  • 【stm32】定时器(超详细)
  • 重构导航之核:高德地图的深度学习架构解析 导论:从数字化世界到可计算世界
  • 手搓3D轮播图组件以及倒影效果
  • Shell 编程 —— 正则表达式与文本处理实战
  • 如何用 Kotlin 在 Android 手机开发一个文字游戏,并加入付费机制?
  • 基于运营商投诉工单的分析系统设计与实现
  • Kotlin
  • 秋招笔记-8.29
  • 哈希表-1.两数之和-力扣(LeetCode)
  • 电路学习(四)半导体
  • LeetCode 165. 比较版本号 - 优雅Java解决方案
  • LangChain开源LLM集成:从本地部署到自定义生成的低成本落地方案
  • 人工智能——课程考核
  • 移动开发如何给不同手机屏幕做适配
  • Shell脚本编程:函数、数组与正则表达式详解
  • [SWPUCTF 2018]SimplePHP
  • 如何用AI视频增强清晰度软件解决画质模糊问题
  • 【音视频】WebRTC QoS 概述
  • 子串:滑动窗口最大值
  • Flutter 完全组件化的项目结构设计实践
  • 王丹妮《营救飞虎》首映礼获赞 三家姐展现坚毅与温柔并存