ADK 第四篇 Runner 执行器
智能体执行器 Runner,负责完成一次用户需求的响应,是ADK中真正让Agent运行起来的引擎,其核心功能和Agents SDK中的Runner类似,具体作用如下:
-
会话管理:自动读取/写入 SessionService,维护历史信息。
-
Agent调用:调用指定的Agent完成推理和工具调用
-
输入输出流转:把用户输入交给Agent,把Agent输出返回。
-
流程控制:支持多轮对话、子Agent委托、工具调用等。
-
生命周期管理:处理每次对话流程的完整生命周期
Runner 是Agent的管理者,负责组织每次对话交互的完整生命周期。
Runner 类(Class)
class Runner:"""Runner 类用于运行智能体。Runner 类用于在会话中管理智能体的执行,负责处理消息、生成事件,并与各类服务进行交互。"""app_name: str"""The app name of the runner."""agent: BaseAgent"""The root agent to run."""artifact_service: Optional[BaseArtifactService] = None"""The artifact service for the runner."""session_service: BaseSessionService"""The session service for the runner."""memory_service: Optional[BaseMemoryService] = None"""The memory service for the runner."""def __init__(self,*,app_name: str,agent: BaseAgent,artifact_service: Optional[BaseArtifactService] = None,session_service: BaseSessionService,memory_service: Optional[BaseMemoryService] = None,):"""Initializes the Runner.Args:app_name: The application name of the runner.agent: The root agent to run.artifact_service: The artifact service for the runner.session_service: The session service for the runner.memory_service: The memory service for the runner."""self.app_name = app_nameself.agent = agentself.artifact_service = artifact_serviceself.session_service = session_serviceself.memory_service = memory_service
参数解析如下:
参数名 | 参数类型 | 含义 | 说明 |
---|---|---|---|
app_name | str | 应用名称 | 应用的名称,用于标识不同的应用实例。 |
agent | BaseAgent | 要运行的主Agent | 负责执行实际的任务和推理 |
artifact_service | Optional[BaseArtifactService] | 文件存储服务(可选) | 可选的文件存储服务,用于处理生成的工件(如:文件) |
session_service | BaseSessionService | 会话服务 | 用于管理和维护对话历史。 |
memory_service | Optional[BaseMemoryService] | 内存服务(可选) | 用于长期和短期记忆管理 |
创建Runner
# 创建智能体执行器
runner = Runner(agent=agent,app_name="app01",session_service=session_service
)
run_async(...)方法
async def run_async(self,*,user_id: str,session_id: str,new_message: types.Content,run_config: RunConfig = RunConfig(),) -> AsyncGenerator[Event, None]:# ...
runner.run_async(...)方法是Runner类的核心方法之一,用于以异步的方式执行Agent的任务。这个方法接收用户输入的消息,并将其添加到会话中,然后启动Agent来处理该消息,并生成一系列的事件(Event).
具体参数解析如下:
1、user_id (str)
-
该参数表示用户的唯一标识,用于标记当前会话是属于哪个用户的,每个用户可以有多个会话实例。
2、session_id (str)
-
该参数表示会话的唯一标识,用于区分不同用户或同一个用户的不同的会话,每次创建会话时都会为其分配一个唯一的session_id
3、new_message (types.Content)
-
这是ADK中的一个核心对象,表示传递给Agent的新消息。types.Content 是一个包含具体消息内容的对象,通常它包含 role(角色)、text(消息文本) 等字段。
-
例如:content: parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='写一句心灵鸡汤')] role='user'
4、run_config (RunConfig) 可选
-
这是一个配置对象,用于定制执行时的参数设置,可以根据需求设置一些如语音配置、响应方式、模型最大调用次数等参数。其默认值为 RunConfig(),这是一个内置的默认配置类。
RunConfig 核心字段解析:
-
speech_config: 用于配置语音识别和语音输出的相关设置(例如:语音转文本、语音输出等),通常与语音助手相关。
-
response_modalities:配置响应的方式,例如返回文本、语音等。
-
save_input_blobs_as_artifacts: 是否将输入的文件保存为工件(例如:将用户上传的图片保存为文件以供后续处理),默认值为false
-
support_cfc: 是否支持CFC(Custom Function Calling),即是否允许Agent调用自定义函数。
-
streaming_mode:设置流模型(例如:是否开启流式输出),用于处理生成时响应时间较长的情况。
-
output_audio_transcription: 配置输出的音频转录设置(比如:支持语音输出)。
-
max_llm_calls: 配置在一次会话中最大可以调用LLM次数的上限。默认值: 500
运行Agent
# 执行对话
async def call_agent_async(query: str, runner, user_id, session_id):"""向agent发送查询并打印最终响应"""print(f"\n>>> 用户: {query} \n")# 将用户消息转换为ADK格式content = types.Content(role='user', parts=[types.Part(text=query)])print("content:", content)final_response_text = "未生成最终响应。" # Default# Key Concept: run_async 会执行Agent逻辑并持续生成事件流(Events)。# 我们通过遍历事件流来获取最终答案。async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):# 您可以取消下面这行的注释,以查看执行过程中的所有事件print(f" [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")# Key Concept: is_final_response() 用于标记当前对话轮次(turn)的终结消息if event.is_final_response():if event.content and event.content.parts:# 假设响应文本位于首部分final_response_text = event.content.parts[0].textelif event.actions and event.actions.escalate: # Handle potential errors/escalationsfinal_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"# Add more checks here if needed (e.g., specific error codes)break # 找到最终响应后停止处理事件print(f"<<< Agent Response: {final_response_text}")if __name__ == '__main__':query = "写一句心灵鸡汤"asyncio.run(call_agent_async(query=query,runner=runner,user_id=USER_ID,session_id=SESSION_ID))
call_agent_async(...) 函数解析
🍂Part 1 导入依赖的模块
from google.genai import types
这个types模块是用于在ADK中创建消息内容的,它允许你定义消息的"角色"(例如:user 或 assistant),以及消息的内容部分。
🍂Part 2 定义 call_agent_async 函数
async def call_agent_async(query: str, runner, user_id, session_id):
-
query: 用户输入的查询问题。
-
runner: 创建的Runner对象,负责启动Agent并管理会话。
-
user_id 和 session_id: 用于标记特定用户的会话,确保模型能够维持会话上下文。
🍂Part 3 将用户消息转换为ADK格式
content = types.Content(role='user', parts=[types.Part(text=query)])
content中内容:
这里将用户的查询(query) 格式化为ADK的消息格式,并通过 types.Content 和 types.Part 封装成可以传递给Agent的内容。 role="user" 表明消息是由用户发送的。
🍂Part 4 准备获取Agent最终回复
final_response_text = "未生成最终响应。" # Default
默认情况下 final_response_text 被初始化为一个错误信息字符串,表明Agent没有最终响应,后续会被更新为最终真正的回答.
🍂Part 5 调用Agent并监听事件
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):# 您可以取消下面这行的注释,以查看执行过程中的所有事件print(f" [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")
runner.run_async(...) 是一个异步生成器,负责执行Agent推理并生成一系列事件 (Events),每个事件代表了交互中的一步操作(如用户消息、模型回复、工具调用等)
你可以通过 async for...循环迭代所有的事件,并逐一检查它们的内容。
Event 事件中的内容:
🍂Part 6 检查是否是最终响应
if event.is_final_response():
event.is_final_response() 用来判断当前事件是否是最终响应,这个判断标志着Agent已经完成了对用户问题的处理,可以停止进一步处理。
if event.content and event.content.parts:final_response_text = event.content.parts[0].text
如果事件 content 存在,并且包含了 parts(模型回复的文本内容) ,就将 最终回复 保存到 final_response_text 变量中。
elif event.actions and event.actions.escalate: # Handle potential errors/escalationsfinal_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
如果 event.actions.escalate为True , 表示有错误发生或需要升级,比如模型无法处理问题,或者需要人工干预,这时,会将相应的错误信息记录在 final_response_text