当前位置: 首页 > news >正文

谷歌ADK接入文件操作MCP

文章目录

  • MCP基础概念
  • 文件操作服务器
  • 文件操作MCP接入谷歌ADK
    • 项目创建
    • 多轮对话代码

MCP基础概念

  • MCP技术体系中,会将外部工具运行脚本称作服务器,而接入这些外部工具的大模型运行环境称作客户端。
    在这里插入图片描述
  • 一个客户端可以接入多个不同类型的服务器,但都要遵循MCP通信协议。
  • MCP服务器的输出内容是一种标准格式的内容,只能被MCP客户端所识别。在客户端和服务器都遵循MCP协议的时候,客户端就能够像Function calling中大模型调用外部工具一样,调用MCP服务器里面的工具。
    在这里插入图片描述
  • ADK作为客户端,接入MCP工具。核心MCPToolset类,该类是ADK连接到MCP服务器的桥梁。
  • 在实际过程中,ADK代理将执行以下操作MCPToolset
    1. 连接:与MCP服务器进程建立连接。该服务器可以是通过标准输入/输出 ( StdioServerParameters ) 进行通信的本地服务器,也可以是使用服务器发送事件 ( SseServerParams ) 的远程服务器。
    2. 发现:查询MCP服务器以获取可用工具(list_tools MCP 方法)。
    3. 适应:将MCP工具模式转换为ADK兼容BaseTool实例。
    4. 公开:将这些适配的工具呈现给ADK Agent
    5. 代理调用:当Agent决定使用其中一个工具时,发到MCP服务器并返回结果。MCPToolset将调用(call_tool MCP方法)转
    6. 管理连接:处理与MCP服务器进程的连接的生命周期,通常需要明确清理的周期(如进程结束后清理)。

文件操作服务器

  • Filesystem服务器是一个最基础同时也是最常用的MCP服务器,同时也是官方推荐的服务器,服务器项目地址。

文件操作MCP接入谷歌ADK

项目创建

  1. 项目框架搭建:
    # 初始化项目
    uv init adk_mcp
    # 进入项目目录
    cd adk_mcp
    # 创建虚拟环境
    uv venv
    # 激活虚拟环境
    .venv\Scripts\activate
    # 安装依赖
    uv add google-adk litellm
    
  2. 创建环境配置文件.env
    OPENAI_API_KEY=xxx
    OPENAI_API_BASE=https://api.openai-hk.com/v1
    MODEL=openai/gpt-4o-mini
    
  3. 创建main.py,代码内容如下:
    # 导入基础库
    import os
    import asyncio
    from dotenv import load_dotenv
    from google.genai import types
    from google.adk.agents import Agent
    from google.adk.models.lite_llm import LiteLlm
    from google.adk.agents.llm_agent import LlmAgent
    from google.adk.runners import Runner
    from google.adk.sessions import InMemorySessionService
    from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters,StdioConnectionParamsDS_API_KEY = os.getenv("OPENAI_API_KEY")
    DS_BASE_URL = os.getenv("OPENAI_API_BASE")
    DS_MODEL = os.getenv("MODEL")model = LiteLlm(model=DS_MODEL,api_base=DS_BASE_URL,api_key=DS_API_KEY
    )# --- Step 1: 创建导入MCP工具函数 ---
    async def get_tools_async():"""Gets tools from the File System MCP Server."""print("Attempting to connect to MCP Filesystem server...")# 确保目录存在test_dir = "D:\\Code\\adk_mcp\\test"if not os.path.exists(test_dir):os.makedirs(test_dir)print(f"Created directory: {test_dir}")try:# 创建MCP工具集实例mcp_toolset = MCPToolset(# 这里采用Studio通信方式进行调用connection_params=StdioConnectionParams(server_params=StdioServerParameters(command='npx',args=["-y", "@modelcontextprotocol/server-filesystem", test_dir],)))# 等待服务器启动await asyncio.sleep(2)# 获取工具列表tools = await mcp_toolset.get_tools()print("MCP Toolset created successfully.")print(f"Fetched {len(tools)} tools from MCP server.")# 返回工具和工具集对象用于清理return tools, mcp_toolsetexcept Exception as e:print(f"Error creating MCP tools: {e}")raise# --- Step 2: 创建ADK Agent --
    async def get_agent_async():"""Creates an ADK Agent equipped with tools from the MCP Server."""try:tools, mcp_toolset = await get_tools_async()root_agent = Agent(model=model,name='filesystem_assistant',instruction='Help user interact with the local filesystem using available tools.',tools=tools,  # 将MCP工具加载到Agent中)return root_agent, mcp_toolsetexcept Exception as e:print(f"Error creating agent: {e}")raise# --- Step 3: 执行主逻辑 ---
    async def async_main():mcp_toolset = Nonetry:# 创建会话管理器session_service = InMemorySessionService()session = await session_service.create_session(state={}, app_name='mcp_filesystem_app', user_id='user_fs')# 用户输入query = "请帮我查找目前文件夹里都有哪些文件?"print(f"User Query: '{query}'")content = types.Content(role='user', parts=[types.Part(text=query)])root_agent, mcp_toolset = await get_agent_async()runner = Runner(app_name='mcp_filesystem_app',agent=root_agent,session_service=session_service,)print("Running agent...")events_async = runner.run_async(session_id=session.id, user_id=session.user_id, new_message=content)async for event in events_async:print(f"Event received: {event}")except Exception as e:print(f"Error during main execution: {e}")raisefinally:# 运行完成后,关闭MCP服务器连接if mcp_toolset:print("Closing MCP server connection...")await mcp_toolset.close()print("Cleanup complete.")if __name__ == '__main__':try:asyncio.run(async_main())except Exception as e:print(f"An error occurred: {e}")
    
  • 执行结果如下:
    (adk_mcp) D:\Code\adk_mcp>uv run main.py
    D:\Code\adk_mcp\.venv\Lib\site-packages\pydantic\_internal\_fields.py:198: UserWarning: Field name "config_type" in "SequentialAgent" shadows an attribute in parent "BaseAgent"warnings.warn(
    User Query: '请帮我查找目前文件夹里都有哪些文件?'
    Attempting to connect to MCP Filesystem server...
    D:\Code\adk_mcp\.venv\Lib\site-packages\google\adk\tools\mcp_tool\mcp_tool.py:87: UserWarning: [EXPERIMENTAL] BaseAuthenticatedTool: This feature is experimental and may change or be removed in future versions without notice. It may introduce breaking changes at any time.super().__init__(
    auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
    MCP Toolset created successfully.
    Fetched 14 tools from MCP server.
    Running agent...
    Event received: content=Content(parts=[Part(text="""首先,我需要确定您当前的工作目录位置。由于您没有指定具体路径,我将先获取允许访问的目录列表:"""),Part(function_call=FunctionCall(args={},id='call_5201be00eeb24a989be18f',name='list_allowed_directories')),],role='model'
    ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=627,prompt_token_count=1806,total_token_count=2433
    ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=set() branch=None id='71634100-b10a-4a81-b779-f34f52df12eb' timestamp=1754979735.932518
    Event received: content=Content(parts=[Part(function_response=FunctionResponse(id='call_5201be00eeb24a989be18f',name='list_allowed_directories',response={'result': CallToolResult(content=[<... Max depth ...>,],isError=False)})),],role='user'
    ) grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=None live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='00b7f823-5a66-493d-95bd-55ee93466fc2' timestamp=1754979761.37467
    Event received: content=Content(parts=[Part(text="""根据系统配置,允许访问的目录是 `D:\Code\adk_mcp\test`。我将列出该目录下的文件:"""),Part(function_call=FunctionCall(args={'path': 'D:\\Code\\adk_mcp\\test'},id='call_2f66909ac2054847b6f9d3',name='list_directory')),],role='model'
    ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=1105,prompt_token_count=1906,total_token_count=3011
    ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=set() branch=None id='39358200-064d-478e-90f6-ab0855f13ec4' timestamp=1754979761.380695
    Event received: content=Content(parts=[Part(function_response=FunctionResponse(id='call_2f66909ac2054847b6f9d3',name='list_directory',response={'result': CallToolResult(content=[<... Max depth ...>,],isError=False)})),],role='user'
    ) grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=None live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='2d1197f7-c81d-41a5-9fc5-15652c50fa7e' timestamp=1754979803.754635
    Event received: content=Content(parts=[Part(text="""当前目录 `D:\Code\adk_mcp\test` 中的文件如下:- 📄 **系统架构设计师教程_带目录高清版.pdf**"""),],role='model'
    ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=128,prompt_token_count=2021,total_token_count=2149
    ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='15fdd91c-e645-46fe-a613-76e1f28323c5' timestamp=1754979803.761915
    Closing MCP server connection...
    Cleanup complete.
    

多轮对话代码

  • 以下代码是main.py的多轮对话的版本:
    # 导入基础库
    import os
    import asyncio
    from dotenv import load_dotenv
    from google.genai import types
    from google.adk.agents import Agent
    from google.adk.models.lite_llm import LiteLlm
    from google.adk.agents.llm_agent import LlmAgent
    from google.adk.runners import Runner
    from google.adk.sessions import InMemorySessionService
    from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters,StdioConnectionParamsDS_API_KEY = os.getenv("OPENAI_API_KEY")
    DS_BASE_URL = os.getenv("OPENAI_API_BASE")
    DS_MODEL = os.getenv("MODEL")model = LiteLlm(model=DS_MODEL,api_base=DS_BASE_URL,api_key=DS_API_KEY
    )# --- Step 1: 创建导入MCP工具函数 ---
    async def get_tools_async():"""Gets tools from the File System MCP Server."""print("Attempting to connect to MCP Filesystem server...")# 确保目录存在test_dir = "D:\\Code\\adk_mcp\\test"if not os.path.exists(test_dir):os.makedirs(test_dir)print(f"Created directory: {test_dir}")try:# 创建MCP工具集实例mcp_toolset = MCPToolset(# 这里采用Studio通信方式进行调用connection_params=StdioConnectionParams(server_params=StdioServerParameters(command='npx',  # Command to run the serverargs=["-y",  # Arguments for the command"@modelcontextprotocol/server-filesystem",test_dir],)))# 等待服务器启动await asyncio.sleep(2)# 获取工具列表tools = await mcp_toolset.get_tools()print("MCP Toolset created successfully.")print(f"Fetched {len(tools)} tools from MCP server.")# 返回工具和工具集对象用于清理return tools, mcp_toolsetexcept Exception as e:print(f"Error creating MCP tools: {e}")raise# --- Step 2: 创建ADK Agent --
    async def get_agent_async():"""Creates an ADK Agent equipped with tools from the MCP Server."""try:tools, mcp_toolset = await get_tools_async()root_agent = Agent(model=model,name='filesystem_assistant',instruction='Help user interact with the local filesystem using available tools.',tools=tools,  # 将MCP工具加载到Agent中)return root_agent, mcp_toolsetexcept Exception as e:print(f"Error creating agent: {e}")raise# 多轮对话函数
    async def chat_loop(runner, user_id, session_id) -> None:print("\n🤖ADK + MCP对话已启动!输入'quit'退出。")while True:query = input("\n你: ").strip()if query.lower() == "quit":breaktry:print(f"\n>>> User Query: {query}")# Prepare the user's message in ADK formatcontent = types.Content(role='user', parts=[types.Part(text=query)])final_response_text = "Agent did not produce a final response."  # Default# Key Concept: run_async executes the agent logic and yields Events.# We iterate through events to find the final answer.async for event in runner.run_async(user_id=user_id,session_id=session_id, new_message=content):# You can uncomment the line below to see *all* events during execution# print(f"  [Event] Author: {event.author}, Type:{type(event).__name__}, Final: {event.is_final_response()}, Content:{event.content}")# Key Concept: is_final_response() marks the concluding message for the turn.if event.is_final_response():if event.content and event.content.parts:# Assuming text response in the first partfinal_response_text = event.content.parts[0].textelif event.actions and event.actions.escalate:  # Handle potential errors / escalationsfinal_response_text = f"Agent escalated:{event.error_message or 'No specific message.'}"# Add more checks here if needed (e.g., specific error codes)breakprint(f"<<< Agent Response: {final_response_text}")except Exception as e:print(f"\n⚠调用过程出错: {e}")# --- Step 3: 执行主逻辑 ---
    async def async_main():mcp_toolset = Nonetry:# 创建会话管理器session_service = InMemorySessionService()session = await session_service.create_session(state={}, app_name='mcp_filesystem_app', user_id='user_fs')# 用户输入query = "请帮我查找目前文件夹里都有哪些文件?"print(f"User Query: '{query}'")content = types.Content(role='user', parts=[types.Part(text=query)])root_agent, mcp_toolset = await get_agent_async()runner = Runner(app_name='mcp_filesystem_app',agent=root_agent,session_service=session_service,)print("Running agent...")events_async = runner.run_async(session_id=session.id, user_id=session.user_id, new_message=content)async for event in events_async:print(f"Event received: {event}")# 启动多轮对话await chat_loop(runner, session.user_id, session.id)except Exception as e:print(f"Error during main execution: {e}")raisefinally:# 运行完成后,关闭MCP服务器连接if mcp_toolset:print("Closing MCP server connection...")await mcp_toolset.close()print("Cleanup complete.")if __name__ == '__main__':try:asyncio.run(async_main())except Exception as e:print(f"An error occurred: {e}")
    
http://www.xdnf.cn/news/1287361.html

相关文章:

  • 力扣47:全排列Ⅱ
  • 基于Python的《红楼梦》文本分析与机器学习应用
  • 力扣 hot100 Day71
  • vivo Pulsar 万亿级消息处理实践(2)-从0到1建设 Pulsar 指标监控链路
  • [激光原理与应用-254]:理论 - 几何光学 - 自动对焦的原理
  • 数据结构:中缀到后缀的转换(Infix to Postfix Conversion)
  • Flutter GridView的基本使用
  • Java 工厂方法模式
  • 【项目设计】高并发内存池
  • 北京-4年功能测试2年空窗-报培训班学测开-第七十四天-线下面试-聊的很满意但可能有风险-等信吧
  • cuda排序算法--双调排序(Bitonic_Sort)
  • web前端第二次作业
  • 开发避坑指南(23):Tomcat高版本URL特殊字符限制问题解决方案(RFC 7230 RFC 3986)
  • TF-IDF:信息检索与文本挖掘的统计权重基石
  • 多奥电梯智能化解决方案的深度解读与结构化总结,内容涵盖系统架构、功能模块、应用场景与社会价值四大维度,力求全面展示该方案的技术先进性与应用前景。
  • Agent智能体基础
  • vue3大事件
  • Linux随记(二十二)
  • 本地(macOS)和服务器时间不同步导致的 Bug排查及解决
  • 从裸机到云原生:Linux 操作系统实战进阶的“四维跃迁”
  • 【Linux】程序地址空间
  • CTO如何通过录音转写和音频降噪,提升企业远程协作效率?
  • 定制客车系统线上购票系统功能设计
  • springboot+JPA
  • 机械臂的智能升维:当传统机械臂遇见Deepoc具身智能大模型从自动化工具到具身智能体的范式革命
  • 【KO】android 音视频
  • Elasticsearch JavaScript 客户端「基础配置」全指南(Node/TS)
  • AWT与Swing深度对比:架构差异、迁移实战与性能优化
  • Git 常用命令速查表
  • java面试题储备4: 谈谈对es的理解