当前位置: 首页 > news >正文

【A2A】管中窥豹,google源码python-demo介绍

在这里插入图片描述

前言

A2A(Agent2Agent)是 Google 推出的一项新协议,旨在解决多智能体(Multi-Agent)系统中跨平台、跨组织协作的难题。它为 AI 代理之间的通信、协作和任务分工提供了一个统一的标准,可以类比为网页世界的 HTTP 协议——即 AI 代理之间的“通用语言”

项目地址:https://github.com/google/A2A/tree/main/samples/python

一、项目架构介绍

1. 项目概述

基于 A2A (Agent-to-Agent) 协议的示例项目,展示了如何使用不同的 AI 框架来实现智能代理之间的通信和协作。

2. 项目结构

samples/python/
├── agents/          # 各种AI代理实现
│   ├── ag2/         # AG2框架实现的代理
│   ├── crewai/      # CrewAI框架实现的代理
│   ├── langgraph/   # LangGraph框架实现的代理
│   ├── google_adk/  # Google ADK框架实现的代理
│   ├── llama_index_file_chat/  # LlamaIndex实现的文件聊天代理
│   ├── marvin/      # Marvin框架实现的代理
│   ├── mindsdb/     # MindsDB实现的代理
│   └── semantickernel/  # Semantic Kernel实现的代理
├── hosts/           # 客户端实现
├── common/          # 公共代码
└── .vscode/         # VS Code配置

3. 技术特点

  1. 多框架支持

    • 支持多种流行的AI框架(LangGraph、CrewAI、AG2等)
    • 每个框架都有独立的代理实现
  2. 标准化通信

    • 使用A2A协议进行代理间通信
    • 基于HTTP的通信机制
    • 统一的请求/响应格式
  3. 客户端实现

    • 提供CLI命令行客户端
    • 支持与多个代理的交互
    • 包含任务编排功能

4. 运行环境要求

  • Python 3.13 或更高版本
  • UV 包管理器
  • 各框架所需的API密钥(如Google API Key等)

5. 使用流程

  1. 启动代理服务器:

    cd samples/python/agents/[agent_name]
    uv run .
    
  2. 启动客户端:

    cd samples/python/hosts/cli
    uv run .
    

6. 主要功能模块

6.1 代理实现(agents/)
  • AG2代理:基于AG2框架的代理实现
  • CrewAI代理:使用CrewAI框架的代理
  • LangGraph代理:基于LangGraph的代理
  • Google ADK代理:使用Google Agent Development Kit的代理
  • LlamaIndex代理:文件聊天功能
  • Marvin代理:基于Marvin框架的代理
  • MindsDB代理:数据库交互代理
  • Semantic Kernel代理:基于Microsoft Semantic Kernel的代理
6.2 客户端实现(hosts/)
  • CLI命令行界面
  • 支持多代理交互
  • 任务编排功能
6.3 公共模块(common/)
  • A2A协议实现
  • 共享工具类
  • 通用接口定义

7. 项目特点

  1. 模块化设计

    • 每个代理都是独立的模块
    • 可以单独运行和测试
  2. 标准化接口

    • 统一的A2A协议
    • 一致的通信格式
  3. 可扩展性

    • 易于添加新的代理实现
    • 支持不同的AI框架
  4. 示例性质

    • 主要用于演示A2A功能
    • 非生产级代码

二、场景介绍

1. 核心场景代理实现

1.1 LangGraph 货币转换代理
  • 功能:提供货币汇率转换服务
  • 技术特点
    • 使用 LangGraph 框架
    • 集成 Google Gemini 模型
    • 支持多轮对话
    • 实时流式响应
    • 使用 Frankfurter API 获取实时汇率
  • 通信流程
    Client Server Agent API 发送货币查询 转发查询 获取汇率数据 返回汇率 处理结果 返回结果 Client Server Agent API
1.2 CrewAI 图像生成代理
  • 功能:基于文本描述生成图像
  • 技术特点
    • 使用 CrewAI 框架
    • 集成 Google Gemini API
    • 支持图像修改
    • 缓存系统
  • 通信流程
    Client Server Agent Gemini 发送图像生成请求 转发请求 生成图像 返回图像 存储并返回ID 返回图像 Client Server Agent Gemini

2. 通信协议(A2A)

2.1 请求格式
{"jsonrpc": "2.0","id": "unique_id","method": "tasks/send","params": {"id": "task_id","sessionId": "session_id","acceptedOutputModes": ["text"],"message": {"role": "user","parts": [{"type": "text","text": "query"}]}}
}
2.2 响应格式
{"jsonrpc": "2.0","id": "unique_id","result": {"id": "task_id","status": {"state": "completed","timestamp": "timestamp"},"artifacts": [{"parts": [{"type": "text","text": "response"}],"index": 0}]}
}

三、Client代码实现

  1. 核心类:A2AClient

a) 初始化

def __init__(self, agent_card: AgentCard = None, url: str = None, timeout: TimeoutTypes = 60.0):# 支持两种初始化方式:# 1. 通过 AgentCard 初始化# 2. 直接通过 URL 初始化# 默认超时时间 60 秒

b) 主要方法

# 1. 发送任务
async def send_task(self, payload: dict[str, Any]) -> SendTaskResponse:# 同步任务发送# 返回任务响应# 2. 流式任务
async def send_task_streaming(self, payload: dict[str, Any]) -> AsyncIterable[SendTaskStreamingResponse]:# 支持 Server-Sent Events (SSE)# 返回流式响应# 3. 获取任务
async def get_task(self, payload: dict[str, Any]) -> GetTaskResponse:# 获取任务状态和结果# 4. 取消任务
async def cancel_task(self, payload: dict[str, Any]) -> CancelTaskResponse:# 取消正在执行的任务# 5. 设置回调
async def set_task_callback(self, payload: dict[str, Any]) -> SetTaskPushNotificationResponse:# 设置任务完成后的回调通知# 6. 获取回调
async def get_task_callback(self, payload: dict[str, Any]) -> GetTaskPushNotificationResponse:# 获取当前任务的回调配置
  1. 辅助类:A2ACardResolver
class A2ACardResolver:def __init__(self, base_url, agent_card_path='/.well-known/agent.json'):# 初始化解析器# 默认从 /.well-known/agent.json 获取代理卡片def get_agent_card(self) -> AgentCard:# 获取并解析代理卡片# 返回 AgentCard 对象
  1. 使用示例
# 1. 基本使用
client = A2AClient(url="http://agent-server")
response = await client.send_task({"message": "生成一张图片","sessionId": "session-123"
})# 2. 流式响应
async for event in client.send_task_streaming({"message": "生成一张图片","sessionId": "session-123"
}):print(event)# 3. 获取代理卡片
resolver = A2ACardResolver("http://agent-server")
card = resolver.get_agent_card()

客户端实现提供了:

  1. 完整的 A2A 协议支持
  2. 异步操作支持
  3. 类型安全
  4. 错误处理
  5. 流式响应
  6. 代理发现

四、Server实现

  1. 核心类:A2AServer

a) 初始化

def __init__(self, host='0.0.0.0', port=5000, endpoint='/', agent_card: AgentCard = None, task_manager: TaskManager = None):# 配置服务器参数# 初始化路由# 设置代理卡片和任务管理器

b) 主要功能

# 1. 启动服务器
def start(self):# 验证必要组件# 启动 uvicorn 服务器# 2. 处理请求
async def _process_request(self, request: Request):# 解析请求# 路由到对应的处理方法# 返回响应# 3. 获取代理卡片
def _get_agent_card(self, request: Request) -> JSONResponse:# 返回代理卡片信息
  1. 任务管理器:TaskManager

a) 抽象基类

class TaskManager(ABC):# 定义任务管理接口@abstractmethodasync def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse@abstractmethodasync def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse@abstractmethodasync def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse# ... 其他抽象方法

b) 内存实现

class InMemoryTaskManager(TaskManager):def __init__(self):# 初始化存储self.tasks = {}  # 任务存储self.push_notification_infos = {}  # 推送通知配置self.task_sse_subscribers = {}  # SSE 订阅者
  1. 关键功能实现

a) 任务管理

# 1. 获取任务
async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse:# 获取任务状态和结果# 2. 取消任务
async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:# 取消正在执行的任务# 3. 更新任务
async def update_store(self, task_id: str, status: TaskStatus, artifacts: list[Artifact]) -> Task:# 更新任务状态和结果

b) 推送通知

# 1. 设置通知
async def set_push_notification_info(self, task_id: str, notification_config: PushNotificationConfig):# 配置任务完成通知# 2. 获取通知
async def get_push_notification_info(self, task_id: str) -> PushNotificationConfig:# 获取任务通知配置

c) SSE 支持

# 1. 设置 SSE 消费者
async def setup_sse_consumer(self, task_id: str, is_resubscribe: bool = False):# 创建 SSE 事件队列# 2. 事件入队
async def enqueue_events_for_sse(self, task_id, task_update_event):# 将事件发送给订阅者# 3. 事件出队
async def dequeue_events_for_sse(self, request_id, task_id, sse_event_queue):# 从队列获取事件并发送
  1. 错误处理
def _handle_exception(self, e: Exception) -> JSONResponse:# 处理不同类型的错误if isinstance(e, json.decoder.JSONDecodeError):json_rpc_error = JSONParseError()elif isinstance(e, ValidationError):json_rpc_error = InvalidRequestError()else:json_rpc_error = InternalError()

这个服务器实现提供了:

  1. 完整的 A2A 协议支持
  2. 异步操作支持
  3. 任务管理
  4. 推送通知
  5. 流式响应
  6. 错误处理

一个功能完整的 A2A 服务器实现,可以作为其他语言实现的参考。

五、LangGraph 货币转换Agent

/samples/python/agents/langgraph

  1. 整体架构
  • 采用三层架构:
    • __main__.py: 服务器入口点
    • agent.py: 核心代理实现
    • task_manager.py: 任务管理实现
  1. 核心组件详解

a) 入口点 (__main__.py)

- 配置服务器参数(host, port)
- 设置代理能力(streaming, pushNotifications)
- 定义代理技能(convert_currency)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器

b) 代理实现 (agent.py)

- 使用 LangGraph 框架实现 ReAct 模式
- 核心组件:1. 工具函数:get_exchange_rate- 调用 Frankfurter API 获取实时汇率- 支持指定日期查询- 错误处理和响应验证2. 响应格式:ResponseFormat- status: input_required/completed/error- message: 响应消息3. CurrencyAgent 类- 使用 Google Gemini 2.0 Flash 模型- 支持同步调用(invoke)和流式响应(stream)- 状态管理和会话追踪

c) 任务管理器 (task_manager.py)

- 继承自 InMemoryTaskManager
- 主要功能:1. 任务验证- 检查输出模式兼容性- 验证推送通知配置2. 任务处理- 同步任务处理(on_send_task)- 流式任务处理(on_send_task_subscribe)- 任务状态更新和通知3. 推送通知- 支持任务状态变更通知- 验证通知 URL 所有权
  1. 关键流程

a) 同步请求流程

1. 客户端发送请求
2. 任务管理器验证请求
3. 代理处理请求
4. 更新任务状态
5. 发送响应

b) 流式请求流程

1. 客户端订阅任务
2. 创建 SSE 事件队列
3. 异步处理代理响应
4. 实时更新任务状态
5. 发送流式事件

六、CrewAI 图像生成Agent

  1. 整体架构
  • 采用三层架构:
    • __main__.py: 服务器入口点
    • agent.py: 核心代理实现
    • task_manager.py: 任务管理实现
  1. 核心组件详解

a) 入口点 (__main__.py)

- 配置服务器参数(host, port)
- 设置代理能力(streaming=False- 定义代理技能(image_generator)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器

b) 代理实现 (agent.py)

- 使用 CrewAI 框架实现图像生成
- 核心组件:1. 数据模型:Imagedata- id: 图像唯一标识- name: 图像名称- mime_type: MIME类型- bytes: Base64编码的图像数据- error: 错误信息2. 工具函数:generate_image_tool- 使用 Google Gemini API 生成图像- 支持图像修改- 缓存管理- 错误处理3. ImageGenerationAgent 类- 支持文本和图像输入- 使用 CrewAI 框架- 图像生成和修改功能- 会话状态管理

c) 任务管理器 (task_manager.py)

- 继承自 InMemoryTaskManager
- 主要功能:1. 任务处理- 验证输出模式- 处理任务请求- 管理任务状态2. 图像处理- 获取图像数据- 处理图像响应- 错误处理
  1. 关键流程

a) 图像生成流程

1. 接收用户提示
2. 创建 CrewAI 任务
3. 调用 Gemini API
4. 处理生成的图像
5. 返回图像数据

b) 图像修改流程

1. 接收修改请求
2. 获取参考图像
3. 生成新图像
4. 更新缓存
5. 返回结果
http://www.xdnf.cn/news/361279.html

相关文章:

  • Go语言中 源文件开头的 // +build 注释的用法
  • 母亲节祝福网页制作
  • 推荐一个很方便的浏览器管理插件Wetab插件
  • 水印云:AI赋能,让图像处理变得简单高效
  • VSCode如何解决打开html页面中文乱码的问题
  • 工业软件自主化突围:RTOS 如何打破 “协议栈 - 控制器” 生态垄断
  • 零件画图实战提升案例(上)
  • 企业高性能WEB服务器—Nginx
  • 【论文阅读】基于客户端数据子空间主角度的聚类联邦学习分布相似性高效识别
  • 深度解析动态IP业务核心场景:从技术演进到行业实践
  • 住宅IP的深度解析与合理运用
  • 探索Stream流:高效数据处理的秘密武器
  • TOGAF 企业架构介绍(4A架构)
  • [javascript]取消异步请求
  • 26考研——中央处理器_指令执行过程(5)
  • qiankun微前端任意位置子应用
  • Kubernetes调度策略深度解析:NodeSelector与NodeAffinity的正确打开方式
  • 网络安全体系架构:核心框架与关键机制解析
  • kubernetes服务自动伸缩-HPA
  • C++ 访问者模式详解
  • Redis面试题
  • 力扣26——删除有序数组中的重复项
  • 【推荐笔记工具】思源笔记 - 隐私优先的个人知识管理系统,支持 Markdown 排版、块级引用和双向链接
  • Qt 的原理及使用(1)——qt的背景及安装
  • 在另一个省发布抖音作品,IP属地会随之变化吗?
  • 【数据结构】1. 时间/空间复杂度
  • 2025数维杯数学建模A题完整论文模型代码:空中芭蕾
  • SpringBoot统一功能处理
  • 13.原生测试框架Unittest解决用例组织问题 与测试套件的使用
  • H5 移动端适配最佳实践落地指南。