当前位置: 首页 > news >正文

小智AI模型接入MCP

ESP32接入小智AI的MCP架构

  1. 整体架构图
    ESP32设备 ←→ MCP服务器 ←→ 小智AI云端 ←→ 用户语音终端
    ↑ ↑ ↑ ↑
    本地GPIO 工具转换 AI模型理解 语音识别
    音量控制 协议适配 工具调用 语音合成
  2. MCP服务器实现
    基于小智AI的要求,需要创建一个MCP服务器来桥接ESP32和小智AI:

esp32_mcp_server.py

# esp32_mcp_server.py
from mcp.server.fastmcp import FastMCP
import logging
import asyncio
import websockets
import json
from typing import Dict, Anylogger = logging.getLogger('esp32_mcp')# 创建MCP服务器
mcp = FastMCP("ESP32_Device_Controller")# ESP32设备连接管理
class ESP32Manager:def __init__(self):self.devices = {}  # device_id -> websocket连接async def connect_device(self, device_id: str, websocket_url: str):"""连接到ESP32设备"""try:websocket = await websockets.connect(websocket_url)self.devices[device_id] = websocketlogger.info(f"Connected to ESP32 device: {device_id}")return Trueexcept Exception as e:logger.error(f"Failed to connect to device {device_id}: {e}")return Falseasync def send_mcp_request(self, device_id: str, method: str, params: Dict = None) -> Dict:"""向ESP32发送MCP请求"""if device_id not in self.devices:raise Exception(f"Device {device_id} not connected")websocket = self.devices[device_id]request = {"jsonrpc": "2.0","id": 1,"method": method,"params": params or {}}await websocket.send(json.dumps(request))response = await websocket.recv()return json.loads(response)# 全局设备管理器
esp32_manager = ESP32Manager()# 在启动时连接到ESP32设备
async def initialize_devices():# 这里配置您的ESP32设备WebSocket地址await esp32_manager.connect_device("esp32_001", "ws://192.168.1.100:80/mcp")@mcp.tool()
def get_device_status(device_id: str = "esp32_001") -> dict:"""Get the current status of ESP32 device including volume, GPIO states, battery, network info. Use this tool to check device condition before controlling it."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.get_device_status","arguments": {}}))logger.info(f"Device status: {response}")return {"success": True, "status": response.get("result", {})}except Exception as e:logger.error(f"Failed to get device status: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def set_audio_volume(volume: int, device_id: str = "esp32_001") -> dict:"""Set the audio speaker volume of ESP32 device. Volume should be between 0 and 100."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.audio_speaker.set_volume","arguments": {"volume": volume}}))logger.info(f"Set volume to {volume}: {response}")return {"success": True, "volume": volume}except Exception as e:logger.error(f"Failed to set volume: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def control_gpio_output(pin_number: int, state: bool, device_id: str = "esp32_001") -> dict:"""Control GPIO pin output state on ESP32 device. Set pin to high (true) or low (false) level."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.gpio.set_output","arguments": {"pin": pin_number, "state": state}}))logger.info(f"Set GPIO{pin_number} to {'HIGH' if state else 'LOW'}: {response}")return {"success": True, "pin": pin_number, "state": state}except Exception as e:logger.error(f"Failed to control GPIO: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def read_gpio_input(pin_number: int, pull_mode: str = "none", device_id: str = "esp32_001") -> dict:"""Read GPIO pin input state from ESP32 device. Pull mode can be 'none', 'up', or 'down'."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.gpio.get_input","arguments": {"pin": pin_number, "pull_mode": pull_mode}}))logger.info(f"Read GPIO{pin_number}: {response}")return {"success": True, "pin": pin_number, "state": response.get("result", {})}except Exception as e:logger.error(f"Failed to read GPIO: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def control_led(action: str, device_id: str = "esp32_001") -> dict:"""Control the built-in LED on ESP32 device. Action can be 'on', 'off', 'toggle', or 'status'."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.led.control","arguments": {"action": action}}))logger.info(f"LED {action}: {response}")return {"success": True, "action": action, "result": response.get("result", {})}except Exception as e:logger.error(f"Failed to control LED: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def set_pwm_brightness(brightness: int, device_id: str = "esp32_001") -> dict:"""Set PWM brightness for dimmable devices like LEDs. Brightness should be between 0 and 100 percent."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.pwm.set_brightness","arguments": {"brightness": brightness}}))logger.info(f"Set PWM brightness to {brightness}%: {response}")return {"success": True, "brightness": brightness}except Exception as e:logger.error(f"Failed to set PWM brightness: {e}")return {"success": False, "error": str(e)}# 启动时初始化设备连接
if __name__ == "__main__":# 初始化设备连接asyncio.run(initialize_devices())# 启动MCP服务器mcp.run(transport="stdio")
  1. MCP管道脚本
    创建连接小智AI的管道脚本:
# mcp_pipe.py
import asyncio
import websockets
import json
import subprocess
import sys
import os
import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger('MCP_PIPE')class MCPPipe:def __init__(self, mcp_script_path: str, endpoint: str):self.mcp_script_path = mcp_script_pathself.endpoint = endpointself.process = Noneself.websocket = Noneasync def start_mcp_process(self):"""启动MCP服务器进程"""self.process = await asyncio.create_subprocess_exec(sys.executable, self.mcp_script_path,stdin=asyncio.subprocess.PIPE,stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE)logger.info(f"Started {self.mcp_script_path} process")async def connect_websocket(self):"""连接到小智AI的WebSocket端点"""logger.info("Connecting to WebSocket server...")self.websocket = await websockets.connect(self.endpoint)logger.info("Successfully connected to WebSocket server")async def handle_websocket_message(self, message):"""处理来自小智AI的消息"""try:# 转发消息到MCP进程self.process.stdin.write(message.encode() + b'\n')await self.process.stdin.drain()# 读取MCP进程的响应response = await self.process.stdout.readline()# 转发响应到小智AIawait self.websocket.send(response.decode().strip())except Exception as e:logger.error(f"Error handling message: {e}")async def run(self):"""运行MCP管道"""await self.start_mcp_process()await self.connect_websocket()try:async for message in self.websocket:await self.handle_websocket_message(message)except websockets.exceptions.ConnectionClosed:logger.info("WebSocket connection closed")finally:if self.process:self.process.terminate()await self.process.wait()async def main():endpoint = os.getenv('MCP_ENDPOINT')if not endpoint:print("Please set MCP_ENDPOINT environment variable")sys.exit(1)if len(sys.argv) != 2:print("Usage: python mcp_pipe.py <mcp_script.py>")sys.exit(1)mcp_script = sys.argv[1]pipe = MCPPipe(mcp_script, endpoint)await pipe.run()if __name__ == "__main__":asyncio.run(main())
  1. 部署和使用
    A. 安装依赖
    pip install mcp websockets asyncio
    B. 配置环境变量

设置小智AI提供的MCP接入点

export MCP_ENDPOINT=“wss://your-xiaozhi-mcp-endpoint”
C. 启动服务

启动ESP32 MCP服务器

python mcp_pipe.py esp32_mcp_server.py
5. ESP32端配置
确保ESP32设备:

启用MCP协议: CONFIG_IOT_PROTOCOL_MCP=y
配置WebSocket服务器: 提供MCP接口
网络连接: 能够被MCP服务器访问
6. 语音控制示例
用户通过小智AI语音终端说话:

用户语音 小智AI理解 MCP工具调用 ESP32执行
“调大音量” 音量控制 set_audio_volume(volume=80) 设置音量到80
“打开LED” LED控制 control_led(action=“on”) GPIO设置LED为高电平
“GPIO2设为高电平” GPIO控制 control_gpio_output(pin_number=2, state=true) 设置GPIO2输出高电平
“检查设备状态” 状态查询 get_device_status() 返回设备完整状态
7. 注意事项
网络配置: 确保MCP服务器能访问ESP32设备
错误处理: 添加设备离线、网络中断的处理逻辑
安全性: 考虑添加设备认证和访问控制
性能优化: 对于频繁操作,考虑连接池和缓存
日志监控: 完善日志记录,便于调试和监控
这样就实现了ESP32设备通过MCP协议接入小智AI的完整方案!

http://www.xdnf.cn/news/1103491.html

相关文章:

  • 后台管理系统-权限管理
  • 深度体验飞算JavaAI:一场Java开发效率的革命
  • 【数据结构】8. 二叉树
  • React中Redux基础和路由介绍
  • React 的常用钩子函数在Vue中是如何设计体现出来的。
  • Qt 实现新手引导
  • QT控件 使用QtServer系统服务实现搭建Aria2下载后台服务,并使用Http请求访问Json-RPC接口调用下载退出
  • Grok-4 发布会图文总结
  • docker宿主机修改ip后起不来问题解决
  • 前端面试专栏-算法篇:22.树结构(二叉树、B树、红黑树)
  • 游戏开发日记
  • MyBatis02-mybatis-config.xml配置文件讲解
  • 【深度探究系列(5)】:前端开发打怪升级指南:从踩坑到封神的解决方案手册
  • 基于kafka的分布式日志收集与实时监控平台(原理,框架)
  • 黑马点评系列问题之P55优惠券秒杀 快捷键问题 Ctrl+D显示不出来老师给的界面
  • 液冷智算数据中心崛起,AI算力联动PC Farm与云智算开拓新蓝海(二)
  • 前端Vue.js面试题(2)
  • 【三维重建工具】NeRFStudio、3D GaussianSplatting、Colmap安装与使用指南
  • 飞书CEO谢欣:挑战巨头,打造AI新时代的Office
  • 20250710-2-Kubernetes 集群部署、配置和验证-网络组件存在的意义?_笔记
  • 用Netplan配置网桥bridge笔记250711
  • lodash不支持 Tree Shaking 而 lodash-es可以
  • STM32F407ZGT6天气时钟+实时温湿度显示(附源码)
  • Java结构型模式---组合模式
  • 瀚高数据库提交数据后,是否需要COMMIT(APP)
  • MyBatis 进阶:连接池、动态 SQL 与多表关联查询
  • SpringBoot 使用注解获取配置文件中的值
  • 机器学习-06(Optimization-自动调整学习率)
  • FS820R08A6P2LB——英飞凌高性能IGBT模块,驱动高效能源未来!
  • 线程通信与进程通信的区别笔记