MCP案例—客户端和服务端
MCP简介
Model Context Protocol (模型上下文协议),简称MCP,MCP是一种协议,用于LLM与外部拓展资源交互的协议。
想了解具体细节可参考作者本篇文章MCP理论指南
准备
本篇文章将带你通过python创建MCP客户端及服务端,并连接到本地模型
- 本地模型,我这里使用ollama来管理本地模型,模型应该是有function calling功能的模型,比如千问、deepseek-v3等,本篇教程使用的是符合openai风格的模型
- 查询天气的网站,这里我们使用OpenWeather,这个需要注册,注册完成后申请一个api-key,测试是免费使用,有次数限制
- MCP官方推荐使用uv进行包管理,感兴趣可以使用,本篇使用的pip包管理工具
- .env文件,用于配置一些基础信息
- 创建client.py、server.py文件,分别当作客户端和服务端,注意.env、client.py、server.py这三个文件应该在同一目录下
- 启动命令我就先写在这个地方
# 这个启动命令,也就是启动client的时候把server.py当作参数传入,从而连接到服务端 python client.py server.py
依赖包安装
本篇使用的是pip来管理包,所以安装工具也是pip
- 安装mcp环境
pip install mcp
- 安装openai
pip install openai
- 安装anthropic
pip install openai
- 安装dotenv
pip install dotenv
- 安装httpx
pip install httpx
.env文件
env文件主要是配置模型的路径,使用的模型及api-key
BASE_URL=http://127.0.0.1:11434/v1 #模型调用路径
MODEL=qwen2.5:72b #模型名称,这个用于表示你要使用哪个模型
API_KEY=ollama #这里api-key是模型要使用的,如果你使用模型厂商的开放接口,就需要填入相应的api-key,如果是本地模型,就可以随意填写
MCP客户端示例
MCP客户端主要是负责与服务端和模型进行通信的中间层,是比较重要的一层,下面这个示例展示了构建一个聊天,连接MCP服务端以及与模型进行交互的过程
import asyncio
import json
import os
from typing import Optional
from contextlib import AsyncExitStackfrom mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAIfrom anthropic import Anthropic
from dotenv import load_dotenvload_dotenv()class MCPClient:def __init__(self):self.session: Optional[ClientSession] = Noneself.exit_stack = AsyncExitStack()self.anthropic = Anthropic()self.api_key = os.getenv("API_KEY") # 读取 OpenAI API Keyself.base_url = os.getenv("BASE_URL") # 读取 BASE YRLself.model = os.getenv("MODEL") # 读取 modelself.client = OpenAI(api_key=self.api_key, base_url=self.base_url)async def connect_to_server(self, server_script_path: str):"""连接MCP服务端Args:server_script_path: Path to the server script (.py or .js)"""is_python = server_script_path.endswith('.py')is_js = server_script_path.endswith('.js')if not (is_python or is_js):raise ValueError("Server script must be a .py or .js file")command = "python" if is_python else "node"server_params = StdioServerParameters(command=command,args=[server_script_path],env=None)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))self.stdio, self.write = stdio_transportself.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))await self.session.initialize()# List available toolsresponse = await self.session.list_tools()tools = response.toolsprint("\nConnected to server with tools:", [tool.name for tool in tools])async def process_query(self, query: str) -> str:"""Process a query using Claude and available tools"""messages = [{"role": "user","content": query}]# 获取可用工具response = await self.session.list_tools()available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}} for tool in response.tools]print("正在等待模型回答...")response = self.client.chat.completions.create(model=self.model,messages=messages,tools=available_tools,temperature=0.7,max_tokens=1000,stream=False)print(response.choices[0])# 处理返回的内容content = response.choices[0]if content.finish_reason == "tool_calls":# 如何是需要使用工具,就解析工具tool_call = content.message.tool_calls[0]tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)# 执行工具result = await self.session.call_tool(tool_name, tool_args)print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")print("工具执行结果:",result)# 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入messages中messages.append(content.message.model_dump())messages.append({"role": "tool","content": result.content[0].text,"tool_call_id": tool_call.id,})# 将上面的结果再返回给大模型用于生产最终的结果response = self.client.chat.completions.create(model=self.model,messages=messages,)return response.choices[0].message.contentreturn content.message.contentasync def chat_loop(self):print("\nMCP Client Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\n我: ").strip()if query.lower() == 'quit':breakresponse = await self.process_query(query)print("\n🤖 qwen" + response)except Exception as e:print(f"\nError: {str(e)}")async def cleanup(self):"""Clean up resources"""await self.exit_stack.aclose()async def main():if len(sys.argv) < 2:print("Usage: python client.py <path_to_server_script>")sys.exit(1)print(sys.argv)client = MCPClient()try:await client.connect_to_server(sys.argv[1])await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":import sysasyncio.run(main())
MCP服务端
MCP服务端,主要是用于定义你的工具、资源等
import json
import httpx
from typing import Any
from mcp.server.fastmcp import FastMCP# 初始化 MCP 服务器
mcp = FastMCP("WeatherServer")# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY = "替换为你自己的apikey"# 请替换为你自己的 OpenWeather API Key
USER_AGENT = "weather-app/1.0"async def fetch_weather(city: str) -> dict[str, Any] | None:"""从 OpenWeather API 获取天气信息。:param city: 城市名称(需使用英文,如 Beijing):return: 天气数据字典;若出错返回包含 error 信息的字典"""params = {"q": city,"appid": API_KEY,"units": "metric","lang": "zh_cn"}headers = {"User-Agent": USER_AGENT}async with httpx.AsyncClient() as client:try:response = await client.get(OPENWEATHER_API_BASE, params=params, headers=headers, timeout=30.0)response.raise_for_status()return response.json() # 返回字典类型except httpx.HTTPStatusError as e:return {"error": f"HTTP 错误: {e.response.status_code}"}except Exception as e:return {"error": f"请求失败: {str(e)}"}def format_weather(data: dict[str, Any] | str) -> str:"""将天气数据格式化为易读文本。:param data: 天气数据(可以是字典或 JSON 字符串):return: 格式化后的天气信息字符串"""# 如果传入的是字符串,则先转换为字典if isinstance(data, str):try:data = json.loads(data)except Exception as e:return f"无法解析天气数据: {e}"# 如果数据中包含错误信息,直接返回错误提示if "error" in data:return f"⚠️ {data['error']}"# 提取数据时做容错处理city = data.get("name", "未知")country = data.get("sys", {}).get("country", "未知")temp = data.get("main", {}).get("temp", "N/A")humidity = data.get("main", {}).get("humidity", "N/A")wind_speed = data.get("wind", {}).get("speed", "N/A")# weather 可能为空列表,因此用 [0] 前先提供默认字典weather_list = data.get("weather", [{}])description = weather_list[0].get("description", "未知")return (f"🌍 {city}, {country}\n"f"🌡 温度: {temp}°C\n"f"💧 湿度: {humidity}%\n"f"🌬 风速: {wind_speed} m/s\n"f"🌤 天气: {description}\n")@mcp.tool()
async def query_weather(city: str) -> str:"""输入指定城市的英文名称,返回今日天气查询结果。:param city: 城市名称(需使用英文):return: 格式化后的天气信息"""data = await fetch_weather(city)return format_weather(data)if __name__ == "__main__":# 以标准 I/O 方式运行 MCP 服务器mcp.run(transport='stdio')
启动服务
启动命令
python client.py server.py
执行后应该会出现下面页面表示成功
我们下面问一个问题,测试一下
在这张图中,问题为北京今天天气怎么样,可以看到,我们把工具及问题传给模型后,模型给我们返回的是要使用工具,并把参数传递给了我们,我们只需要把参数以及工具通过客户端调用服务端,就可以拿到天气api给我们的天气情况,然后把结果及问题一并给模型处理生成最终的回答,这就是MCP调用的一次流程。