使用 A2A Python SDK 实现 CurrencyAgent
谷歌官方的a2a-python SDK最近频繁的更新,我们的教程也需要跟着更新,这篇文章,我们通过 a2a-python sdk的 0.2.3
版本,实现一个简单的CurrencyAgent。
https://a2aprotocol.ai/blog/a2a-sdk-currency-agent-tutorial-zh
目录
- 源码
- 准备
- 详细过程
- 创建项目
- 创建虚拟环境
- 添加依赖
- 配置环境变量
- 创建 Agent
- 核心功能
- 系统架构
- 系统提示词
- 主要方法
- 工作流程
- 响应格式
- 错误处理
- 测试 Agent
- 实现 AgentExecutor
- 实现 AgentServer
- AgentSkill
- AgentCard
- AgentServer
- 运行
- 运行 Server
- 运行 Client
源码
项目的源码在a2a-python-currency,欢迎 star 。
准备
- uv 0.7.2,用来进行项目管理
- Python 3.13+,一定要这个版本以上,a2a-python 的要求
- openai/openrouter 的 apiKey,baseURL,我使用的是 OpenRouter,有更多的模型可以选择。
详细过程
创建项目:
uv init a2a-python-currency
cd a2a-python-currency
创建虚拟环境
uv venv
source .venv/bin/activate
添加依赖
uv add a2a-sdk uvicorn dotenv click
配置环境变量
echo OPENROUTER_API_KEY=your_api_key >> .env
echo OPENROUTER_BASE_URL=your_base_url >> .env# example
OPENROUTER_API_KEY=你的OpenRouter API密钥
OPENROUTER_BASE_URL="https://openrouter.ai/api/v1"
创建 Agent
完整的代码如下:
import logging
import json
from typing import Any, Dict, List, Optional
import httpx
from os import getenv
from dotenv import load_dotenv
from collections.abc import AsyncIterableload_dotenv()logger = logging.getLogger(__name__)class CurrencyAgent:"""Currency Conversion Agent using OpenAI API."""SYSTEM_PROMPT = """You are a specialized assistant for currency conversions.
Your sole purpose is to use the 'get_exchange_rate' tool to answer questions about currency exchange rates.
If the user asks about anything other than currency conversion or exchange rates,
politely state that you cannot help with that topic and can only assist with currency-related queries.
Do not attempt to answer unrelated questions or use tools for other purposes.You have access to the following tool:
- get_exchange_rate: Get current exchange rate between two currenciesWhen using the tool, respond in the following JSON format:
{"status": "completed" | "input_required" | "error","message": "your response message"
}If you need to use the tool, respond with:
{"status": "tool_use","tool": "get_exchange_rate","parameters": {"currency_from": "USD","currency_to": "EUR","currency_date": "latest"}
}
Note: Return the response in the JSON format, only json is allowed.
"""def __init__(self):self.api_key = getenv("OPENROUTER_API_KEY")self.api_base = getenv("OPENROUTER_BASE_URL")self.model = "anthropic/claude-3.7-sonnet"self.conversation_history: List[Dict[str, str]] = []async def get_exchange_rate(self,currency_from: str = 'USD',currency_to: str = 'EUR',currency_date: str = 'latest',) -> Dict[str, Any]:"""Get current exchange rate between currencies."""try:response = httpx.get(f'https://api.frankfurter.app/{currency_date}',params={'from': currency_from, 'to': currency_to},)response.raise_for_status()data = response.json()if 'rates' not in data:logger.error(f'rates not found in response: {data}')return {'error': 'Invalid API response format.'}logger.info(f'API response: {data}')return dataexcept httpx.HTTPError as e:logger.error(f'API request failed: {e}')return {'error': f'API request failed: {e}'}except ValueError:logger.error('Invalid JSON response from API')return {'error': 'Invalid JSON response from API.'}async def _call_openai(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:"""Call OpenAI API through OpenRouter."""async with httpx.AsyncClient() as client:response = await client.post(f"{self.api_base}/chat/completions",headers={"Authorization": f"Bearer {self.api_key}","Content-Type": "application/json",},json={"model": self.model,"messages": messages,"temperature": 0.7,"stream": False,},)response.raise_for_status()return response.json()async def stream(self, query: str, session_id: str) -> AsyncIterable[Dict[str, Any]]:"""Stream the response for a given query."""# Add user message to conversation historyself.conversation_history.append({"role": "user", "content": query})# Prepare messages for API callmessages = [{"role": "system", "content": self.SYSTEM_PROMPT}] + self.conversation_history# Get response from OpenAIresponse = await self._call_openai(messages)assistant_message = response["choices"][0]["message"]["content"]print(assistant_message)try:# Try to parse the response as JSONparsed_response = json.loads(assistant_message)# If it's a tool use requestif parsed_response.get("status") == "tool_use":tool_name = parsed_response["tool"]parameters = parsed_response["parameters"]# Yield tool usage statusyield {"is_task_complete": False,"require_user_input": False,"content": "Looking up the exchange rates..."}if tool_name == "get_exchange_rate":# Yield processing statusyield {"is_task_complete": False,"require_user_input": False,"content": "Processing the exchange rates..."}tool_result = await self.get_exchange_rate(**parameters)# Add tool result to conversation historyself.conversation_history.append({"role": "assistant","content": json.dumps({"tool_result": tool_result})})# Get final response after tool usefinal_response = await self._call_openai(messages)final_message = final_response["choices"][0]["message"]["content"]parsed_response = json.loads(final_message)# Add assistant response to conversation historyself.conversation_history.append({"role": "assistant",