美团搜索推荐统一Agent之交互协议与多Agent协同
🌈 我是“没事学AI”!要是这篇文章让你学 AI 的路上有了点收获:
👁️ 【关注】跟我一起挖 AI 的各种门道,看看它还有多少新奇玩法等着咱们发现
👍 【点赞】为这些有用的 AI 知识鼓鼓掌,让更多人知道学 AI 也能这么轻松
🔖 【收藏】把这些 AI 小技巧存起来,啥时候想练手了,翻出来就能用
💬 【评论】说说你学 AI 时的想法和疑问,让大家的思路碰出更多火花
学 AI 的路还长,咱们结伴同行,在 AI 的世界里找到属于自己的乐趣和成就!
目录
- 一、详细设计
- 1. 多Agent交互协议设计
- 1.1 协议核心模块
- 1.2 消息格式规范
- 2. 多Agent协同机制设计
- 2.1 协同场景核心逻辑
- 二、具体实现
- 任务1:交互协议实现
- 实现步骤:
- 任务2:多Agent协同逻辑开发
- 实现步骤:
- 验证结果
一、详细设计
1. 多Agent交互协议设计
1.1 协议核心模块
模块 | 功能描述 | 技术实现 |
---|---|---|
消息格式定义 | 统一Agent间通信的数据结构(同步/异步) | JSON序列化 + 自定义字段校验 |
通信模式适配 | 同步通信(RPC)与异步通信(Kafka)切换 | 动态代理 + 注解驱动(@Sync/@Async) |
消息可靠性保障 | 异步消息重试、幂等性处理 | Kafka事务消息 + 消息ID去重 |
序列化工具 | 高效数据序列化/反序列化(支持复杂对象) | Protostuff(二进制) + Jackson(JSON) |
1.2 消息格式规范
-
同步通信消息(RPC):轻量结构,侧重实时性
{"msgId": "sync_123456", // 消息ID(用于追踪)"sender": "intent_agent", // 发送方Agent名称"receiver": "recall_agent", // 接收方Agent名称"timestamp": 1718900000000, // 发送时间戳"data": { // 业务数据(与Agent接口输入匹配)"intent": "购买水果","entities": ["苹果", "香蕉"],"confidence": 0.92},"traceId": "trace_789" // 分布式追踪ID }
-
异步通信消息(Kafka):包含状态字段,支持重试
{"msgId": "async_789012","sender": "rank_agent","receiver": "reason_agent","timestamp": 1718900001000,"data": {"sortedGoodsIds": ["g1001", "g1002"],"rankScores": [0.95, 0.88]},"status": "PENDING", // 状态:PENDING/SUCCESS/FAILED"retryCount": 0, // 重试次数"traceId": "trace_789" }
2. 多Agent协同机制设计
2.1 协同场景核心逻辑
- 意图-召回协同:意图Agent将实体信息结构化传递给召回Agent,确保召回商品与意图匹配(如“苹果”既可能是水果也可能是手机,需通过实体类型区分)。
- 动态负载均衡:当某类Agent(如排序Agent)压力过高时,自动将任务分流至备用实例(基于Dubbo负载均衡扩展)。
- 结果对齐机制:推荐理由生成Agent需关联排序Agent的评分结果,确保理由与商品优先级匹配(如“推荐理由1对应评分最高的商品”)。
二、具体实现
任务1:交互协议实现
实现步骤:
-
消息格式与序列化工具开发
- 定义消息基础类(支持同步/异步通用字段):
@Data public class AgentMessage<T> {private String msgId;private String sender;private String receiver;private long timestamp;private T data;private String traceId;// 异步消息特有字段(通过继承扩展)public static class Async<T> extends AgentMessage<T> {private String status;private int retryCount;} }
- 开发序列化工具(支持JSON与二进制切换):
@Service public class MessageSerializer {// JSON序列化(用于调试和简单对象)private final ObjectMapper jsonMapper = new ObjectMapper();// 二进制序列化(用于高性能场景)private final ProtostuffSerializer protoSerializer = new ProtostuffSerializer();public byte[] serialize(Object data, boolean useBinary) {if (useBinary) {return protoSerializer.serialize(data);} else {return jsonMapper.writeValueAsBytes(data);}}public <T> T deserialize(byte[] bytes, Class<T> clazz, boolean useBinary) {if (useBinary) {return protoSerializer.deserialize(bytes, clazz);} else {return jsonMapper.readValue(bytes, clazz);}} }
- 定义消息基础类(支持同步/异步通用字段):
-
通信模式适配(同步/异步切换)
- 基于注解驱动实现通信模式选择:
// 同步通信注解(默认) @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Sync {int timeout() default 300; // 超时时间(ms) }// 异步通信注解 @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Async {String topic() default "agent-communication"; // Kafka主题int maxRetry() default 3; // 最大重试次数 }// Agent服务接口示例(自动适配通信模式) public interface RecallAgent {@Sync(timeout = 500) // 同步调用RecallResult recall(IntentData data);@Async(topic = "recall-async") // 异步调用void asyncRecall(IntentData data, Callback callback); }
- 基于注解驱动实现通信模式选择:
-
消息可靠性保障
- 异步消息重试与幂等处理:
@Component public class KafkaMessageListener {@Autowiredprivate AgentMessageHandler handler;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@KafkaListener(topics = "agent-communication")public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String msgId = record.headers().lastHeader("msgId").value().toString();// 1. 幂等性校验(防止重复消费)if (Boolean.TRUE.equals(redisTemplate.hasKey("msg:processed:" + msgId))) {ack.acknowledge();return;}// 2. 解析消息并处理AgentMessage.Async<?> message = parseMessage(record.value());try {handler.process(message);// 3. 标记为已处理redisTemplate.opsForValue().set("msg:processed:" + msgId, "1", 24, TimeUnit.HOURS);ack.acknowledge();} catch (Exception e) {// 4. 重试判断(未达最大重试次数则转发至重试队列)if (message.getRetryCount() < message.getMaxRetry()) {message.setRetryCount(message.getRetryCount() + 1);sendToRetryQueue(message);}ack.acknowledge(); // 避免重复拉取}} }
- 异步消息重试与幂等处理:
任务2:多Agent协同逻辑开发
实现步骤:
-
意图-召回协同逻辑
- 意图Agent输出结构化实体信息,召回Agent基于实体类型精准召回:
// 意图Agent输出增强(包含实体类型) public class IntentData {private String intent;private List<Entity> entities; // 实体包含类型信息@Datapublic static class Entity {private String name; // 如"苹果"private String type; // 如"fruit"或"electronic"private float score; // 实体置信度} }// 召回Agent处理逻辑 @Service public class RecallAgentImpl implements RecallAgent {@Overridepublic RecallResult recall(IntentData data) {// 根据实体类型过滤召回池(如type=fruit则从生鲜库召回)List<String> goodsIds = goodsRepository.recallByEntityTypes(data.getEntities().stream().map(Entity::getType).collect(Collectors.toList()));return new RecallResult(goodsIds);} }
- 意图Agent输出结构化实体信息,召回Agent基于实体类型精准召回:
-
动态负载均衡扩展
- 基于Dubbo自定义负载均衡策略(优先选择负载低的Agent实例):
public class AgentLoadBalance extends AbstractLoadBalance {@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {// 1. 获取各实例的当前负载(通过监控指标)List<Invoker<T>> availableInvokers = new ArrayList<>();for (Invoker<T> invoker : invokers) {// 从监控中心获取该实例的CPU使用率double cpuUsage = monitorService.getCpuUsage(invoker.getUrl().getHost());if (cpuUsage < 70) { // CPU使用率<70%视为可用availableInvokers.add(invoker);}}// 2. 从可用实例中随机选择(简单策略,可优化为加权)if (availableInvokers.isEmpty()) {return invokers.get(ThreadLocalRandom.current().nextInt(invokers.size()));} else {return availableInvokers.get(ThreadLocalRandom.current().nextInt(availableInvokers.size()));}} }
- 基于Dubbo自定义负载均衡策略(优先选择负载低的Agent实例):
验证结果
-
交互协议功能验证
- 同步通信:意图Agent调用召回Agent,传递“购买水果(实体类型=fruit)”数据,召回Agent返回10个生鲜类商品ID,响应时间45ms(≤500ms超时阈值)。
- 异步通信:排序Agent发送商品排序结果至推荐理由生成Agent,Kafka消息延迟≤100ms,模拟消息丢失场景时,重试机制触发并成功重新投递。
- 幂等性验证:重复发送3条相同msgId的消息,仅首次被处理,后续均被过滤(Redis去重生效)。
-
多Agent协同验证
- 意图-召回协同:当用户查询“苹果”且实体类型识别为“fruit”时,召回结果中90%为水果类商品(对比无类型区分时的60%,精准度提升30%)。
- 负载均衡:模拟排序Agent集群中1台实例CPU达80%,新请求自动分流至其他CPU<50%的实例,分流后各实例负载差≤15%。
🌟 大家好,我是“没事学AI”!
🤖 在AI的星辰大海里,我是那个执着的航海者,带着对智能的好奇不断探索。
📚 每一篇技术解析都是我打磨的罗盘,每一次模型实操都是我扬起的风帆。
💻 每一行代码演示都是我的航线记录,每一个案例拆解都是我的藏宝图绘制。
🚀 在人工智能的浪潮中,我既是领航员也是同行者。让我们一起,在AI学习的航程里,解锁更多AI的奥秘与可能——别忘了点赞、关注、收藏,跟上我的脚步,让“没事学AI”陪你从入门到精通!