当前位置: 首页 > news >正文

用 fastmcp 2.0 做一个“短期记忆(Redis)”的 MCP 服务器(Server)+ 一个简单的 Client 例子

用 fastmcp 2.0 做一个“短期记忆(Redis)”的 MCP 服务器(Server)+ 一个简单的 Client 例子。

提供常用的 5 个工具:
• mem_put(session_id, key, value, ttl_s=3600):设置/覆盖 KV,并自动续期
• mem_get(session_id, key):读取 KV
• mem_append(session_id, role, text, max_items=200, ttl_s=3600):把对话片段追加进“时间线”,自动裁剪长度并续期
• mem_recent(session_id, n=20):取最近 n 条“时间线”片段
• mem_clear(session_id):清理该会话的短期记忆(KV + 时间线)

设计要点
• 每个会话隔离:Redis key 形如 mcp:mem:{session}:kv、mcp:mem:{session}:tl
• TTL 滚动续期:只要有读写/追加,就刷新过期时间(短期记忆特性)
• 时间线用 LIST + RPUSH/LTRIM;KV 用 HASH;都在写入时设置 EXPIRE
• 所有接口幂等、安全可控,便于接入到你的 Agent(A2A 层)作为“内存工具”

  1. MCP Server(fastmcp 2.0 + Redis)

server.py

import os, json, time, asyncio
from typing import Optional, Literal
import redis.asyncio as redis
from fastmcp import FastMCPREDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
TTL_DEFAULT = int(os.getenv("MEM_TTL_S", "3600"))  # 1 小时短期记忆mcp = FastMCP("redis-memory")

— Redis 客户端(懒加载即可;也可在进程启动时创建) —

_redis: Optional[redis.Redis] = None
def r() -> redis.Redis:global _redisif _redis is None:_redis = redis.from_url(REDIS_URL, decode_responses=True)return _redis

— Key 约定 —

def k_kv(session_id: str) -> str: return f"mcp:mem:{session_id}:kv"
def k_tl(session_id: str) -> str: return f"mcp:mem:{session_id}:tl"async def _bump_ttl(session_id: str, ttl_s: int):# 对 KV 与时间线都尝试续期;不存在则忽略await r().expire(k_kv(session_id), ttl_s)await r().expire(k_tl(session_id), ttl_s)

========== 工具 1:KV 写入/覆盖 ==========

@mcp.tool
async def mem_put(session_id: str, key: str, value: str, ttl_s: int = TTL_DEFAULT) -> str:"""设置/覆盖短期记忆 KV(会话隔离),并滚动续期 TTL。"""await r().hset(k_kv(session_id), key, value)await _bump_ttl(session_id, ttl_s)return "ok"

========== 工具 2:KV 读取 ==========

@mcp.tool
async def mem_get(session_id: str, key: str, default: Optional[str] = None, ttl_s: int = TTL_DEFAULT) -> str:"""读取短期记忆 KV;命中后会顺带续期 TTL。"""val = await r().hget(k_kv(session_id), key)if val is None:return default if default is not None else ""await _bump_ttl(session_id, ttl_s)return val

========== 工具 3:时间线追加 ==========

@mcp.tool
async def mem_append(session_id: str,role: Literal["user", "agent", "note"],text: str,max_items: int = 200,ttl_s: int = TTL_DEFAULT,
) -> str:"""追加对话片段到“时间线”并按 max_items 裁剪。每次写入滚动续期 TTL。"""item = {"ts": int(time.time()),"role": role,"text": text,}pipe = r().pipeline()pipe.rpush(k_tl(session_id), json.dumps(item, ensure_ascii=False))pipe.ltrim(k_tl(session_id), -max_items, -1)  # 只保留尾部 N 条pipe.expire(k_tl(session_id), ttl_s)pipe.expire(k_kv(session_id), ttl_s)await pipe.execute()return "ok"

========== 工具 4:读取最近 n 条 ==========

@mcp.tool
async def mem_recent(session_id: str, n: int = 20, ttl_s: int = TTL_DEFAULT) -> str:"""返回最近 n 条时间线(JSON 数组字符串)。读取也会续期 TTL。"""vals = await r().lrange(k_tl(session_id), -n, -1)await _bump_ttl(session_id, ttl_s)items = [json.loads(v) for v in vals]return json.dumps(items, ensure_ascii=False)

========== 工具 5:清空该会话短期记忆 ==========

@mcp.tool
async def mem_clear(session_id: str) -> str:"""删除该会话的 KV 与时间线(不可恢复)。"""pipe = r().pipeline()pipe.delete(k_kv(session_id))pipe.delete(k_tl(session_id))await pipe.execute()return "ok"if __name__ == "__main__":# 1) 进程内 stdio(适配 Claude Desktop、本地嵌入)# mcp.run()# 2) HTTP/SSE 传输(便于你的 Agent 以 URL 方式连接)mcp.run(transport="http", host="127.0.0.1", port=8010, path="/mcp")

运行:

pip install fastmcp redis
docker run -p 6379:6379 -d redis:7-alpine   # 本地起个 Redis
python server.py

现在 MCP Server 在 http://127.0.0.1:8010/mcp (也可改用 stdio 运行)

  1. MCP Client 调用示例

client.py

import asyncio, json
from fastmcp import Clientasync def main():# 1) 连接到 HTTP MCP Serverasync with Client("http://127.0.0.1:8010/mcp") as c:sid = "demo-session-1"# KV:写入与读取await c.call_tool("mem_put", {"session_id": sid, "key": "topic", "value": "白露"})topic = await c.call_tool("mem_get", {"session_id": sid, "key": "topic"})print("topic =", topic.text)# 时间线:追加两条,再读取最近 5 条await c.call_tool("mem_append", {"session_id": sid, "role": "user", "text": "什么是白露?"})await c.call_tool("mem_append", {"session_id": sid, "role": "agent", "text": "白露是二十四节气之一..."})recent = await c.call_tool("mem_recent", {"session_id": sid, "n": 5})print("recent =", json.loads(recent.text))# 清理# await c.call_tool("mem_clear", {"session_id": sid})asyncio.run(main())

  1. 在你的 Agent(A2A × FastAPI)里怎么用

在 Orchestrator 里作为“工具调用”,比如当每轮问答结束:

伪代码(在生成完 agent_answer 后)

await mcp_client.call_tool("mem_append", {"session_id": context_id,    # 用 A2A 的 contextId 做会话隔离"role": "user", "text": user_query
})
await mcp_client.call_tool("mem_append", {"session_id": context_id,"role": "agent", "text": agent_answer
})

当下一轮生成前,把最近若干条短期记忆拉入上下文:

res = await mcp_client.call_tool("mem_recent", {"session_id": context_id, "n": 10})
mem_snippets = json.loads(res.text)

将这些片段拼接到系统提示或检索阶段使用

  1. 可选增强(生产建议)
    • 会话/租户隔离:把 tenant_id 前缀进 Redis key(如 mcp:mem:{tenant}:{session})。
    • 安全:给 HTTP 传输加鉴权(反代层或 fastmcp 的认证钩子);对 session_id 做白名单/格式校验。
    • 容量治理:max_items、ttl_s 做上限;可增加 ESTMEM 指标、按租户限流。
    • 尺寸裁剪:如果你有 token 预算限制,可把 mem_append 的 text 在入库前做“摘要/截断”(例如按字符或模型摘要)。
    • 一致性与原子性:关键路径用 pipeline();需要跨键计数/配额时可用 Lua 脚本或 RedisJSON。
    • 观测:为每个工具加时延/命中率指标(Prometheus),并打通 trace(请求→工具→Redis)。
http://www.xdnf.cn/news/1361719.html

相关文章:

  • Java项目-苍穹外卖_Day2
  • Ubuntu24.04配置yolov5
  • 使用 Gemini CLI作为 Claude Code的 subagent
  • 分布式锁设计实战:多级缓存防御设计优化同步性能
  • 《眼科学》10月版面征稿论文速递
  • Nestjs生命周期中全局方法执行顺序
  • 嵌入式开发学习———Linux环境下网络编程学习(六)
  • MySQL 行转列与列转行的实现方式
  • 在新塘SDK下面,有四中文件夹,GCC、IAR、KEIL、和Keil_AC6.这4个工程有什么区别。各自是怎样配置寄存器并实现SPI功能的
  • Aligning Effective Tokens with Video Anomaly in Large Language Models
  • Node.js面试题及详细答案120题(43-55) -- 性能优化与内存管理篇
  • 《飞算Java开发实战:从入门安装到项目部署》
  • 【GEE+Python 实战】用 Sentinel-2 监测 2024 年研究区 NDVI 变化(附完整源码与避坑指南)
  • Codejock Suite ProActiveX COM Crack
  • 一文掌握 Java 键盘输入:从入门到高阶(含完整示例与避坑指南)
  • LIANA | part1 intro部分
  • VMware Workstation 不可恢复错误:(vcpu-0)
  • 详细的周任务清单(Week1-Week24,每周具体目标+任务)
  • Socket some functions
  • 基于PHP服装租赁管理系统/基于php的服装管理系统的设计与实现
  • C#_gRPC
  • 【图像处理基石】基于 Python 的图像行人删除技术:实现街景无干扰化处理
  • 6.1Element UI布局容器
  • leetcode 162 寻找峰值
  • Polkadot - JAM
  • 13种常见机器学习算法总结
  • 青少年软件编程(python六级)等级考试试卷-客观题(2023年3月)
  • 学习制作记录(选项UI以及存档系统)8.24
  • 基于RISC-V架构的国产MCU在eVTOL领域的应用研究与挑战分析
  • 【Ollama】本地OCR