当前位置: 首页 > news >正文

A2A + MCP 的python实现的最小可运行骨架

A2A + MCP Python Skeleton

目的:演示 A2A(代理↔代理) 作为编排骨干、MCP(代理↔工具/数据) 作为工具接入的最小可运行骨架。包含一个 Router、三个子 Agent(ASR / Legal / TTS),以及一个 Mock MCP Server(JSON‑RPC 风格),全链路支持 SSE 流式


目录结构

.
├─ requirements.txt
├─ .env.sample
├─ src/
│  ├─ common/
│  │  ├─ a2a.py
│  │  └─ mcp.py
│  ├─ router.py
│  ├─ agents/
│  │  ├─ asr_agent.py
│  │  ├─ legal_agent.py
│  │  └─ tts_agent.py
│  └─ tools/
│     └─ mock_mcp_server.py

requirements.txt

fastapi==0.111.0
uvicorn[standard]==0.30.0
httpx==0.27.2
pydantic==2.8.2
python-dotenv==1.0.1

.env.sample

# Router 将任务路由到各个 Agent 的地址
ASR_AGENT_URL=http://127.0.0.1:8011
LEGAL_AGENT_URL=http://127.0.0.1:8012
TTS_AGENT_URL=http://127.0.0.1:8013# MCP Server 地址(示例使用 mock)
MCP_ENDPOINT=http://127.0.0.1:8099/mcp

运行前复制为 .env 并按需修改。


src/common/a2a.py

from __future__ import annotations
import json
import uuid
from typing import Any, AsyncIterator, Dictfrom pydantic import BaseModel, FieldA2A_MIME_SSE = "text/event-stream"# === A2A 任务与事件 ===
class A2ATask(BaseModel):id: str = Field(default_factory=lambda: str(uuid.uuid4()))type: str  # 例如: "asr.transcribe", "legal.analyze", "tts.speak"payload: Dict[str, Any] = {}stream: bool = True  # 是否请求流式class A2AEvent(BaseModel):task_id: strevent: str  # 例如: "progress", "delta", "final", "error"data: Dict[str, Any] = {}# === SSE 编解码 ===def sse_encode(event: A2AEvent) -> bytes:"""将事件编码为 SSE 文本块。"""line = "data: " + event.model_dump_json() + "\n\n"return line.encode("utf-8")async def sse_stream_generator(generator: AsyncIterator[A2AEvent]):async for ev in generator:yield sse_encode(ev)# === 工具函数 ===
class A2AError(RuntimeError):pass

src/common/mcp.py

from __future__ import annotations
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optionalimport httpx
from pydantic import BaseModelclass MCPError(RuntimeError):passclass MCPClient:"""最小 MCP JSON-RPC 客户端(HTTP 版)。- list_tools(): 调用 JSON-RPC method="tools/list_tools"- call(tool, args): 调用 JSON-RPC method="tools/call"注:真实 MCP 可用 stdio / SSE 等传输,这里给出 HTTP JSON‑RPC 骨架,便于快速落地与替换。"""def __init__(self, endpoint: str, timeout: float = 30.0):self.endpoint = endpoint.rstrip("/")self.timeout = timeoutself._client = httpx.AsyncClient(timeout=timeout)async def _rpc(self, method: str, params: Dict[str, Any]) -> Any:req = {"jsonrpc": "2.0","id": str(uuid.uuid4()),"method": method,"params": params,}r = await self._client.post(self.endpoint, json=req)r.raise_for_status()data = r.json()if "error" in data:raise MCPError(str(data["error"]))return data.get("result")async def list_tools(self) -> List[Dict[str, Any]]:return await self._rpc("tools/list_tools", {})async def call(self, tool: str, args: Dict[str, Any]) -> Dict[str, Any]:return await self._rpc("tools/call", {"tool": tool, "args": args})async def aclose(self):await self._client.aclose()

src/router.py

from __future__ import annotations
import os
from typing import AsyncIteratorimport httpx
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_encodeload_dotenv()ASR_URL = os.getenv("ASR_AGENT_URL", "http://127.0.0.1:8011")
LEGAL_URL = os.getenv("LEGAL_AGENT_URL", "http://127.0.0.1:8012")
TTS_URL = os.getenv("TTS_AGENT_URL", "http://127.0.0.1:8013")ROUTE_TABLE = {"asr.": ASR_URL,"legal.": LEGAL_URL,"tts.": TTS_URL,
}app = FastAPI(title="A2A Router")def _pick_agent(task_type: str) -> str:for prefix, url in ROUTE_TABLE.items():if task_type.startswith(prefix):return urlraise HTTPException(status_code=400, detail=f"No agent for task type: {task_type}")@app.post("/a2a/task")
async def route_task(task: A2ATask):agent_base = _pick_agent(task.type)agent_endpoint = f"{agent_base}/a2a/task"async with httpx.AsyncClient(timeout=None) as client:if task.stream:# 以 SSE 方式转发,并将下游的 SSE 原样转发给调用方resp = await client.post(agent_endpoint, json=task.model_dump(), headers={"accept": "text/event-stream"})if resp.status_code != 200:raise HTTPException(status_code=resp.status_code, detail=resp.text)async def forward() -> AsyncIterator[bytes]:async for chunk in resp.aiter_bytes():# 直接转发下游 SSE 字节流yield chunkreturn StreamingResponse(forward(), media_type="text/event-stream")else:resp = await client.post(agent_endpoint, json=task.model_dump())return JSONResponse(status_code=resp.status_code, content=resp.json())

src/agents/asr_agent.py

from __future__ import annotations
import os
import asyncio
from typing import AsyncIteratorfrom dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClientload_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")app = FastAPI(title="ASR Agent")async def _asr_stream(mcp: MCPClient, audio_url: str, task_id: str) -> AsyncIterator[A2AEvent]:# 这里演示“边调用工具边产出增量”的流程。真实情况可采用 chunk 推理。yield A2AEvent(task_id=task_id, event="progress", data={"msg": "asr_started"})# 调用 MCP 工具(mock)result = await mcp.call("asr.transcribe", {"audio_url": audio_url})text = result.get("text", "")# 模拟分词增量for token in text.split():await asyncio.sleep(0.05)yield A2AEvent(task_id=task_id, event="delta", data={"token": token})yield A2AEvent(task_id=task_id, event="final", data={"text": text})@app.post("/a2a/task")
async def handle(task: A2ATask):if task.type != "asr.transcribe":raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")audio_url = task.payload.get("audio_url")if not audio_url:raise HTTPException(status_code=400, detail="payload.audio_url required")mcp = MCPClient(MCP_ENDPOINT)if task.stream:gen = _asr_stream(mcp, audio_url, task.id)return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")else:# 非流式:一次性返回result = await mcp.call("asr.transcribe", {"audio_url": audio_url})return JSONResponse(result)

src/agents/legal_agent.py

from __future__ annotations
import os
import asyncio
from typing import AsyncIteratorfrom dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClientload_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")app = FastAPI(title="Legal Expert Agent")async def _legal_stream(mcp: MCPClient, query: str, task_id: str) -> AsyncIterator[A2AEvent]:yield A2AEvent(task_id=task_id, event="progress", data={"msg": "legal_started"})# 1) RAG 检索(示例:调用 MCP 的 kb.search)kb = await mcp.call("kb.search", {"query": query, "top_k": 3})yield A2AEvent(task_id=task_id, event="progress", data={"kb_hits": kb.get("hits", [])})# 2) LLM 归纳(示例:调用 MCP 的 llm.complete)prompt = f"根据以下材料做初步法律分析:\n{kb.get('context', '')}\n---\n问题:{query}\n"llm = await mcp.call("llm.complete", {"prompt": prompt, "temperature": 0.2})# 模拟分段流出text = llm.get("text", "")for chunk in [text[i:i+60] for i in range(0, len(text), 60)]:await asyncio.sleep(0.05)yield A2AEvent(task_id=task_id, event="delta", data={"text": chunk})yield A2AEvent(task_id=task_id, event="final", data={"text": text})@app.post("/a2a/task")
async def handle(task: A2ATask):if task.type != "legal.analyze":raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")query = task.payload.get("query")if not query:raise HTTPException(status_code=400, detail="payload.query required")mcp = MCPClient(MCP_ENDPOINT)if task.stream:gen = _legal_stream(mcp, query, task.id)return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")else:kb = await mcp.call("kb.search", {"query": query, "top_k": 3})llm = await mcp.call("llm.complete", {"prompt": str(kb), "temperature": 0.2})return JSONResponse(llm)

注意:from __future__ annotations 是 Python 3.12 写法,若报错改为 from __future__ import annotations


src/agents/tts_agent.py

from __future__ import annotations
import os
import asyncio
from typing import AsyncIteratorfrom dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClientload_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")app = FastAPI(title="TTS Agent")async def _tts_stream(mcp: MCPClient, text: str, task_id: str) -> AsyncIterator[A2AEvent]:yield A2AEvent(task_id=task_id, event="progress", data={"msg": "tts_started"})# 调用 MCP 的 tts.speak(mock),返回“伪音频块”数组result = await mcp.call("tts.speak", {"text": text})chunks = result.get("chunks", [])for i, ch in enumerate(chunks):await asyncio.sleep(0.05)yield A2AEvent(task_id=task_id, event="delta", data={"seq": i, "audio_chunk": ch})yield A2AEvent(task_id=task_id, event="final", data={"ok": True})@app.post("/a2a/task")
async def handle(task: A2ATask):if task.type != "tts.speak":raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")text = task.payload.get("text")if not text:raise HTTPException(status_code=400, detail="payload.text required")mcp = MCPClient(MCP_ENDPOINT)if task.stream:gen = _tts_stream(mcp, text, task.id)return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")else:result = await mcp.call("tts.speak", {"text": text})return JSONResponse(result)

src/tools/mock_mcp_server.py

from __future__ import annotations
import random
from typing import Any, Dictfrom fastapi import FastAPI
from pydantic import BaseModelapp = FastAPI(title="Mock MCP Server (JSON-RPC over HTTP)")TOOLS = [{"name": "asr.transcribe", "args": {"audio_url": "str"}},{"name": "kb.search", "args": {"query": "str", "top_k": "int"}},{"name": "llm.complete", "args": {"prompt": "str", "temperature": "float"}},{"name": "tts.speak", "args": {"text": "str"}},
]class JSONRPCRequest(BaseModel):jsonrpc: strid: strmethod: strparams: Dict[str, Any] | None = Noneclass JSONRPCResponse(BaseModel):jsonrpc: str = "2.0"id: strresult: Dict[str, Any] | None = Noneerror: Dict[str, Any] | None = None@app.post("/mcp")
async def mcp(req: JSONRPCRequest) -> JSONRPCResponse:try:if req.method == "tools/list_tools":return JSONRPCResponse(id=req.id, result={"tools": TOOLS})if req.method == "tools/call":params = req.params or {}tool = params.get("tool")args = params.get("args", {})if tool == "asr.transcribe":text = f"(mock asr) transcribed from {args.get('audio_url','?')}"return JSONRPCResponse(id=req.id, result={"text": text})if tool == "kb.search":hits = [{"id": f"doc{i}", "score": round(random.random(), 3), "snippet": f"snippet {i}"}for i in range(1, (args.get("top_k", 3) + 1))]context = "\n".join(h["snippet"] for h in hits)return JSONRPCResponse(id=req.id, result={"hits": hits, "context": context})if tool == "llm.complete":prompt = args.get("prompt", "")return JSONRPCResponse(id=req.id, result={"text": f"(mock llm) summary for: {prompt[:60]}..."})if tool == "tts.speak":text = args.get("text", "")chunks = [f"audio-bytes-chunk-{i}" for i in range(5)]return JSONRPCResponse(id=req.id, result={"chunks": chunks, "desc": f"(mock) tts of {text[:20]}..."})return JSONRPCResponse(id=req.id, error={"message": f"unknown tool: {tool}"})return JSONRPCResponse(id=req.id, error={"message": f"unknown method: {req.method}"})except Exception as e:return JSONRPCResponse(id=req.id, error={"message": str(e)})

启动方式

1) 安装依赖

python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.sample .env

2) 分别启动 Mock MCP 与各 Agent、Router

# 终端 1:Mock MCP Server
uvicorn src.tools.mock_mcp_server:app --host 0.0.0.0 --port 8099 --reload# 终端 2:ASR Agent
uvicorn src.agents.asr_agent:app --host 0.0.0.0 --port 8011 --reload# 终端 3:Legal Agent
uvicorn src.agents.legal_agent:app --host 0.0.0.0 --port 8012 --reload# 终端 4:TTS Agent
uvicorn src.agents.tts_agent:app --host 0.0.0.0 --port 8013 --reload# 终端 5:Router
uvicorn src.router:app --host 0.0.0.0 --port 8000 --reload

测试(SSE 流式)

例如让 Router 把任务派给 Legal Agent:

curl -N -H 'Accept: text/event-stream' \-H 'Content-Type: application/json' \-d '{"type": "legal.analyze","payload": {"query": "公司违法辞退赔偿怎么计算?"},"stream": true}' \http://127.0.0.1:8000/a2a/task

你会看到 data: {"task_id":..., "event":"progress"...}deltafinal 等事件逐步返回。


架构要点回顾

  • A2A/a2a/task 即代理之间的统一入口;Router 依据 task.type 做分发,并透传 SSE 流式结果。
  • MCP:各代理内部通过 MCPClient 调用工具(这里用 mock_mcp_server 演示),未来可无缝替换为真实 MCP 工具(向量库、Redis 记忆、企业 API、LLM、TTS/ASR 服务等)。
  • 可扩展:新增代理=加一个服务并在 Router 的 ROUTE_TABLE 加前缀映射;新增工具=在 MCP 端注册新 tool 名并在代理里调用。

下一步可升级点

  1. 鉴权/租户隔离:在 Router 与 Agent 层统一加 Bearer/JWT;任务元数据里携带 tenant_id / session_id。
  2. 重试与超时:Router 对下游 Agent 实现熔断/超时、错误回传;Agent 内部对 MCP 工具加重试策略。
  3. 真正的流式工具:将 MCP 端改为 SSE/分块输出,MCPClient 增加 acall_stream,实现端到端全链路流式。
  4. 记忆:通过 MCP 工具挂 Redis:memory.write/read/append,在 Router/Agent 层封装对话级短期记忆。
  5. 发现与注册:为 A2A 增加 /.well-known/agents,支持 Agent 动态注册/心跳;或引入服务发现组件。
  6. 观测:统一埋点(OpenTelemetry)、任务状态表(Postgres/ClickHouse)、SSE 日志镜像通道用于回放与质检。
http://www.xdnf.cn/news/1425223.html

相关文章:

  • Jmeter实现参数化的4种方式
  • 构建AI智能体:二十、妙笔生花:Gradio集成DashScope Qwen-Image模型实现文生图
  • 人脸识别备案的重要意义
  • ES6新特性:JavaScript的进化装备箱[特殊字符]
  • 记一次使用函数式接口
  • A股大盘数据-20250901 分析
  • GD32入门到实战25--独立看门狗
  • JAVA后端开发——MyBatis 结合 MySQL JSON 类型查询详解
  • 【STM32】贪吃蛇 [阶段 3] 增强模块结构(架构优化)
  • curl 介绍及使用教程
  • python爬虫之selenium库进阶(小白五分钟从入门到精通)
  • 基本渗透概念
  • Raft 协议在 Nacos 中的实现
  • 从零开始实现Shell | Linux进程调度实战
  • Product Hunt 每日热榜 | 2025-09-01
  • 基于YOLOv11的脑卒中目标检测及其完整数据集——推动智能医疗发展的新机遇!
  • 齿轮里的 “双胞胎”:分度圆与节圆
  • [React]监听Form中某个字段的变化
  • 微算法科技(NASDAQ:MLGO)张量网络与机器学习融合,MPS分类器助力顶夸克信号识别
  • deepseek doubao chatgpt 优缺点分析
  • 并发--并发中的线程状态及不同状态下线程所在队列
  • React学习教程,从入门到精通, React 入门指南:创建 React 应用程序的语法知识点(7)
  • OpenCV-CUDA 图像处理
  • 数据库常见故障类型
  • 知识产品和标准化
  • 在 Qt 中加载 .qm 翻译文件
  • 跳跃游戏(二):DFS 求解最少跳跃次数与最优路径
  • 专项智能练习(Word)
  • JavaSE:抽象类和接口
  • 计算机视觉(五):blur