[大模型问数]实现大模型调用MYSQL(03)【MCP笔记】
文章结尾部分有CSDN官方提供的学长 联系方式名片
文章结尾部分有CSDN官方提供的学长 联系方式名片
关注B站,有好处!
上一次我们已经完成了模拟数据的开发,本篇笔记继续完成MYSQL MCP服务端的开发。
创建server.py
import asyncio
import logging
import os
import sys
from mysql.connector import connect, Error
from mcp.server import Server
# 从 mcp.types 模块导入 Resource, Tool, TextContent 类,这些类用于定义资源、工具和文本内容的类型
from mcp.types import Resource, Tool, TextContent
# 从 pydantic 模块导入 AnyUrl 类,用于验证和处理 URL 类型的数据
from pydantic import AnyUrl# 配置日志记录,设置日志的基本参数
# Configure logging
logging.basicConfig(# 设置日志级别为 INFO,这意味着日志系统会记录 INFO 及以上级别的日志信息level=logging.INFO,# 定义日志的输出格式,包含时间、日志器名称、日志级别和日志消息format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 创建一个名为 "mysql_mcp_server" 的日志记录器,后续代码将使用该记录器输出日志
logger = logging.getLogger("mysql_mcp_server")def get_db_config():"""从环境变量中获取数据库配置信息。如果缺少必要的配置信息(用户名、密码、数据库名),会记录错误日志。Returns:dict: 包含数据库配置信息的字典。Raises:ValueError: 当缺少必要的数据库配置信息时抛出。"""# 初始化一个字典,用于存储从环境变量中获取的数据库配置信息config = {# 从环境变量 MYSQL_HOST 中获取数据库主机地址,若未设置则使用默认值 "localhost""host": os.getenv("MYSQL_HOST", "localhost"),# 从环境变量 MYSQL_PORT 中获取数据库端口号,若未设置则使用默认值 "3306",并转换为整数类型"port": int(os.getenv("MYSQL_PORT", "3306")),# 从环境变量 MYSQL_USER 中获取数据库用户名,若未设置则使用默认值 "root""user": os.getenv("MYSQL_USER", "root"),# 从环境变量 MYSQL_PASSWORD 中获取数据库密码,若未设置则为 None"password": os.getenv("MYSQL_PASSWORD", "XXXX"),# 从环境变量 MYSQL_DATABASE 中获取要连接的数据库名,若未设置则使用默认值 "MYSQL""database": os.getenv("MYSQL_DATABASE", "MYSQL")}# 检查是否缺少必要的数据库配置信息(用户名、密码、数据库名)if not all([config["user"], config["password"], config["database"]]):# 若缺少必要配置,记录错误日志,提示用户检查环境变量logger.error("Missing required database configuration. Please check environment variables:")logger.error("MYSQL_USER, MYSQL_PASSWORD, and MYSQL_DATABASE are required")raise ValueError("Missing required database configuration")return config# Initialize server
app = Server("mysql_mcp_server")@app.list_resources()
async def list_resources() -> list[Resource]:"""List MySQL tables as resources."""config = get_db_config()try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute("SHOW TABLES")tables = cursor.fetchall()logger.info(f"Found tables: {tables}")resources = []for table in tables:resources.append(Resource(uri=f"mysql://{table[0]}/data",name=f"Table: {table[0]}",mimeType="text/plain",description=f"Data in table: {table[0]}"))return resourcesexcept Error as e:logger.error(f"Failed to list resources: {str(e)}")return []@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""Read table contents."""config = get_db_config()uri_str = str(uri)logger.info(f"Reading resource: {uri_str}")if not uri_str.startswith("mysql://"):raise ValueError(f"Invalid URI scheme: {uri_str}")parts = uri_str[8:].split('/')table = parts[0]try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(f"SELECT * FROM {table} LIMIT 100")columns = [desc[0] for desc in cursor.description]rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return "\n".join([",".join(columns)] + result)except Error as e:logger.error(f"Database error reading resource {uri}: {str(e)}")raise RuntimeError(f"Database error: {str(e)}")@app.list_tools()
async def list_tools() -> list[Tool]:"""List available MySQL tools."""logger.info("Listing tools...")return [Tool(name="execute_sql",description="Execute an SQL query on the MySQL server",inputSchema={"type": "object","properties": {"query": {"type": "string","description": "The SQL query to execute"}},"required": ["query"]})]@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""执行 SQL 命令。Args:name (str): 要调用的工具名称。arguments (dict): 传递给工具的参数,应包含 SQL 查询语句。Returns:list[TextContent]: 包含执行结果的文本内容列表。"""# 获取数据库配置信息config = get_db_config()# 记录调用工具的日志,包含工具名称和传入的参数logger.info(f"Calling tool: {name} with arguments: {arguments}")# 检查要调用的工具是否为 execute_sql,若不是则抛出异常if name != "execute_sql":raise ValueError(f"Unknown tool: {name}")# 从参数中获取 SQL 查询语句query = arguments.get("query")# 若未提供查询语句,则抛出异常if not query:raise ValueError("Query is required")try:# 建立数据库连接with connect(**config) as conn:# 创建数据库游标with conn.cursor() as cursor:# 执行 SQL 查询cursor.execute(query)# 特殊处理 SHOW TABLES 语句if query.strip().upper().startswith("SHOW TABLES"):# 获取查询结果tables = cursor.fetchall()# 定义结果表头result = ["Tables_in_" + config["database"]]# 将查询到的表名添加到结果列表中result.extend([table[0] for table in tables])# 返回包含结果文本的 TextContent 对象列表return [TextContent(type="text", text="\n".join(result))]# 处理所有返回结果集的查询(如 SELECT, SHOW, DESCRIBE 等)elif cursor.description is not None:# 获取查询结果的列名columns = [desc[0] for desc in cursor.description]try:# 获取查询结果的所有行rows = cursor.fetchall()# 将每行结果转换为逗号分隔的字符串result = [",".join(map(str, row)) for row in rows]# 返回包含结果文本的 TextContent 对象列表return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]except Error as e:# 记录获取结果时的错误日志logger.warning(f"Error fetching results: {str(e)}")# 返回包含错误信息的 TextContent 对象列表return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]# 处理非 SELECT 查询else:# 提交数据库事务conn.commit()# 返回包含执行成功信息和受影响行数的 TextContent 对象列表return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]except Error as e:# 记录执行 SQL 查询时的错误日志logger.error(f"Error executing SQL '{query}': {e}")# 返回包含错误信息的 TextContent 对象列表return [TextContent(type="text", text=f"Error executing query: {str(e)}")]async def main():"""主入口函数,用于启动 MCP 服务器。该函数会输出数据库配置信息,记录启动日志,并启动 MCP 服务器。"""# 从 mcp.server.stdio 模块导入 stdio_server 函数,用于创建标准输入输出服务器from mcp.server.stdio import stdio_server# 输出调试信息到标准错误输出,提示即将启动 MySQL MCP 服务器print("Starting MySQL MCP server with config:", file=sys.stderr)# 调用 get_db_config 函数获取数据库配置信息config = get_db_config()# 输出数据库主机配置信息到标准错误输出print(f"Host: {config['host']}", file=sys.stderr)# 输出数据库端口配置信息到标准错误输出print(f"Port: {config['port']}", file=sys.stderr)# 输出数据库用户配置信息到标准错误输出print(f"User: {config['user']}", file=sys.stderr)# 输出数据库名称配置信息到标准错误输出print(f"Database: {config['database']}", file=sys.stderr)# 使用日志记录器记录启动 MySQL MCP 服务器的信息logger.info("Starting MySQL MCP server...")# 使用日志记录器记录数据库配置信息logger.info(f"Database config: {config['host']}/{config['database']} as {config['user']}")# 异步上下文管理器,创建标准输入输出服务器,获取读取和写入流# 重新抛出异常# 若服务器运行过程中出现异常,使用日志记录器记录错误信息,并记录异常堆栈# 调用 app 的 run 方法启动服务器,传入读取流、写入流和初始化选项async with stdio_server() as (read_stream, write_stream):try:await app.run(read_stream,write_stream,app.create_initialization_options())except Exception as e:logger.error(f"Server error: {str(e)}", exc_info=True)raiseif __name__ == "__main__":asyncio.run(main())
测试
启动server.py后就可以测试了,测试我们使用cherry studio软件。
这里我们先配置了一个硅基流动的大模型
然后配置系统的MCP,如果右上角不是绿色的,则需要安装UV和BUN
配置MCP注意启动MCP 服务的路径,这里是项目工程的路径:
在聊天界面开启MCP
此外,在提示词中添加表结构,方便大模型可以理解
成功!
点击execute_sql 可以查看工具调用SQL以及结果: