当前位置: 首页 > backend >正文

吴恩达MCP课程(4):connect_server_mcp_chatbot

目录

    • 完整代码
    • 代码解释
      • 1. 导入和初始化
      • 2. 类型定义
      • 3. MCP_ChatBot 类初始化
      • 4. 查询处理 (process_query)
      • 5. 服务器连接管理
      • 6. 核心特性总结
    • 示例

完整代码

原课程代码是用Anthropic写的,下面代码是用OpenAI改写的,模型则用阿里巴巴的模型做测试
.env 文件为:

OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1

另外,课程代码只是单轮对话,下面代码修改为多轮对话,更适合千问模型的调用方式

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dictclass MCP_ChatBot:def __init__(self):# Initialize session and client objectsself.sessions: List[ClientSession] = []  # newself.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = []  # newself.tool_to_session: Dict[str, ClientSession] = {}self.messages = []async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )process_query = Truewhile process_query:# 获取助手的回复message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)process_query = False# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史self.messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 执行工具调用session = self.tool_to_session[tool_name]result = await session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史self.messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})# 获取下一个回复response = self.client.chat.completions.create(model='qwen-turbo',# max_tokens=2024,tools=self.available_tools,messages=self.messages )self.messages.append({"role": "assistant", "content": response.choices[0].message.content})# 如果只有文本回复,则结束处理if response.choices[0].message.content and not response.choices[0].message.tool_calls:print(response.choices[0].message.content)process_query = Falseasync def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakawait self.process_query(query)print("\n")except Exception as e:print(f"\nError: {str(e)}")async def connect_to_server(self, server_name: str, server_config: dict) -> None:"""Connect to a single MCP server."""try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()self.sessions.append(session)# List available tools for this sessionresponse = await session.list_tools()tools = response.toolsprint(f"\nConnected to {server_name} with tools:", [t.name for t in tools])for tool in tools:  # newself.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})except Exception as e:print(f"Failed to connect to {server_name}: {e}")async def connect_to_servers(self):  # new"""Connect to all configured MCP servers."""try:with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server configuration: {e}")raise           async def clenup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.clenup()if __name__ == "__main__":asyncio.run(main())"""
1、Fetch the content of this website: https://modelcontextprotocol.io/docs/concepts/architecture. 
2、save the content in the file "mcp_summary.md"
"""

代码解释

1. 导入和初始化

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import List,TypedDict
from contextlib import AsyncExitStack
from typing import Dict
import asyncio
import json
import osload_dotenv()
  • 导入必要的库,包括OpenAI客户端、MCP协议相关模块、异步处理模块等
  • load_dotenv() 加载环境变量配置

2. 类型定义

class ToolDefinition(TypedDict):name: strdescription: strinput_schema: dict

定义工具的类型结构,用于类型提示。

3. MCP_ChatBot 类初始化

class MCP_ChatBot:def __init__(self):self.sessions: List[ClientSession] = []  # 存储多个MCP会话self.exit_stack = AsyncExitStack()  # 管理异步资源self.client = openai.OpenAI(  # OpenAI客户端api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[ToolDefinition] = []  # 可用工具列表self.tool_to_session: Dict[str, ClientSession] = {}  # 工具名到会话的映射self.messages = []  # 对话历史

关键特性:

  • 多会话支持sessions 列表存储多个MCP服务器会话
  • 工具映射tool_to_session 将工具名映射到对应的会话,实现工具路由
  • 资源管理:使用 AsyncExitStack 管理异步资源的生命周期

4. 查询处理 (process_query)

async def process_query(self, query):self.messages.append({'role':'user', 'content':query})response = self.client.chat.completions.create(model='qwen-turbo',tools=self.available_tools,messages=self.messages )

核心处理逻辑:

  1. 消息循环处理:使用 while process_query 循环处理多轮对话
  2. 工具调用处理:检测并执行工具调用,通过 tool_to_session 路由到正确的MCP服务器
  3. 结果整合:将工具执行结果添加到对话历史中

5. 服务器连接管理

async def connect_to_server(self, server_name: str, server_config: dict) -> None:# 建立单个服务器连接server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# ... 获取工具并建立映射for tool in tools:self.tool_to_session[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})
async def connect_to_servers(self):# 连接所有配置的服务器with open("server_config.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)

6. 核心特性总结

多服务器支持

  • 可以同时连接多个MCP服务器
  • 每个服务器的工具都被统一管理
  • 通过工具名自动路由到正确的服务器

OpenAI格式兼容

  • 工具定义使用OpenAI的函数调用格式
  • 支持完整的工具调用流程

异步处理

  • 全异步设计,支持并发处理
  • 使用 AsyncExitStack 管理资源生命周期

配置化管理

  • 通过 server_config.json 配置多个服务器
  • 支持动态加载服务器配置

这个实现相比单服务器版本的主要优势是可以整合多个不同功能的MCP服务器,为用户提供更丰富的工具集合。

示例

uv run connect_server_map_chatbot.py

请添加图片描述

http://www.xdnf.cn/news/10530.html

相关文章:

  • springboot中@Async做异步操作(Completable异步+ThreadPoolTaskExecutor线程池+@Async注解)
  • shp转3d tiles在cesium渲染楼宇白膜
  • Linux 驱动之设备树
  • Leetcode 2093. 前往目标城市的最小费用
  • SAR ADC 异步逻辑设计
  • Linux系统配置屏幕旋转和触摸旋转
  • 从冷上电到main()函数,Bootloader都做了什么?
  • 数据类型检测有哪些方式?
  • robot_lab学习笔记【MDP综述】
  • QuickJS 如何计算黄金分割率 ?
  • barker-OFDM模糊函数原理及仿真
  • Linux防火墙:全面解析IPTables的表、链、规则!
  • Cypress + TypeScript + Vue3
  • 数据库管理与高可用-MySQL全量,增量备份与恢复
  • 劫持进程注入
  • C语言进阶--程序的编译(预处理动作)+链接
  • 数据结构:递归(Recursion)
  • 基于TMC5160堵转检测技术的夹紧力控制系统设计与实现
  • 输入ifconfig,发现ens33不见了,无法连接至虚拟机
  • Golang——3、流程控制语句
  • C++实现伽罗华域生成及四则运算(三)
  • Python----目标检测(《SSD: Single Shot MultiBox Detector》论文和SSD的原理与网络结构)
  • CppCon 2014 学习:C++ in Huge AAA Games
  • STM32F407寄存器操作(多通道单ADC+DMA)
  • 前端面试准备-5
  • Mask_RCNN 环境配置及训练
  • QT中子线程触发主线程弹窗并阻塞等待用户响应-传统信号槽实现
  • DRW - 加密市场预测
  • 考研系列—操作系统:第四章、文件管理(part.2)
  • 利用DeepSeek编写能在DuckDB中读PostgreSQL表的表函数