用 fastmcp 2.0 做一个“短期记忆(Redis)”的 MCP 服务器(Server)+ 一个简单的 Client 例子
用 fastmcp 2.0 做一个“短期记忆(Redis)”的 MCP 服务器(Server)+ 一个简单的 Client 例子。
提供常用的 5 个工具:
• mem_put(session_id, key, value, ttl_s=3600):设置/覆盖 KV,并自动续期
• mem_get(session_id, key):读取 KV
• mem_append(session_id, role, text, max_items=200, ttl_s=3600):把对话片段追加进“时间线”,自动裁剪长度并续期
• mem_recent(session_id, n=20):取最近 n 条“时间线”片段
• mem_clear(session_id):清理该会话的短期记忆(KV + 时间线)
设计要点
• 每个会话隔离:Redis key 形如 mcp:mem:{session}:kv、mcp:mem:{session}:tl
• TTL 滚动续期:只要有读写/追加,就刷新过期时间(短期记忆特性)
• 时间线用 LIST + RPUSH/LTRIM;KV 用 HASH;都在写入时设置 EXPIRE
• 所有接口幂等、安全可控,便于接入到你的 Agent(A2A 层)作为“内存工具”
⸻
- MCP Server(fastmcp 2.0 + Redis)
server.py
import os, json, time, asyncio
from typing import Optional, Literal
import redis.asyncio as redis
from fastmcp import FastMCPREDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
TTL_DEFAULT = int(os.getenv("MEM_TTL_S", "3600")) # 1 小时短期记忆mcp = FastMCP("redis-memory")
— Redis 客户端(懒加载即可;也可在进程启动时创建) —
_redis: Optional[redis.Redis] = None
def r() -> redis.Redis:global _redisif _redis is None:_redis = redis.from_url(REDIS_URL, decode_responses=True)return _redis
— Key 约定 —
def k_kv(session_id: str) -> str: return f"mcp:mem:{session_id}:kv"
def k_tl(session_id: str) -> str: return f"mcp:mem:{session_id}:tl"async def _bump_ttl(session_id: str, ttl_s: int):# 对 KV 与时间线都尝试续期;不存在则忽略await r().expire(k_kv(session_id), ttl_s)await r().expire(k_tl(session_id), ttl_s)
========== 工具 1:KV 写入/覆盖 ==========
@mcp.tool
async def mem_put(session_id: str, key: str, value: str, ttl_s: int = TTL_DEFAULT) -> str:"""设置/覆盖短期记忆 KV(会话隔离),并滚动续期 TTL。"""await r().hset(k_kv(session_id), key, value)await _bump_ttl(session_id, ttl_s)return "ok"
========== 工具 2:KV 读取 ==========
@mcp.tool
async def mem_get(session_id: str, key: str, default: Optional[str] = None, ttl_s: int = TTL_DEFAULT) -> str:"""读取短期记忆 KV;命中后会顺带续期 TTL。"""val = await r().hget(k_kv(session_id), key)if val is None:return default if default is not None else ""await _bump_ttl(session_id, ttl_s)return val
========== 工具 3:时间线追加 ==========
@mcp.tool
async def mem_append(session_id: str,role: Literal["user", "agent", "note"],text: str,max_items: int = 200,ttl_s: int = TTL_DEFAULT,
) -> str:"""追加对话片段到“时间线”并按 max_items 裁剪。每次写入滚动续期 TTL。"""item = {"ts": int(time.time()),"role": role,"text": text,}pipe = r().pipeline()pipe.rpush(k_tl(session_id), json.dumps(item, ensure_ascii=False))pipe.ltrim(k_tl(session_id), -max_items, -1) # 只保留尾部 N 条pipe.expire(k_tl(session_id), ttl_s)pipe.expire(k_kv(session_id), ttl_s)await pipe.execute()return "ok"
========== 工具 4:读取最近 n 条 ==========
@mcp.tool
async def mem_recent(session_id: str, n: int = 20, ttl_s: int = TTL_DEFAULT) -> str:"""返回最近 n 条时间线(JSON 数组字符串)。读取也会续期 TTL。"""vals = await r().lrange(k_tl(session_id), -n, -1)await _bump_ttl(session_id, ttl_s)items = [json.loads(v) for v in vals]return json.dumps(items, ensure_ascii=False)
========== 工具 5:清空该会话短期记忆 ==========
@mcp.tool
async def mem_clear(session_id: str) -> str:"""删除该会话的 KV 与时间线(不可恢复)。"""pipe = r().pipeline()pipe.delete(k_kv(session_id))pipe.delete(k_tl(session_id))await pipe.execute()return "ok"if __name__ == "__main__":# 1) 进程内 stdio(适配 Claude Desktop、本地嵌入)# mcp.run()# 2) HTTP/SSE 传输(便于你的 Agent 以 URL 方式连接)mcp.run(transport="http", host="127.0.0.1", port=8010, path="/mcp")
运行:
pip install fastmcp redis
docker run -p 6379:6379 -d redis:7-alpine # 本地起个 Redis
python server.py
现在 MCP Server 在 http://127.0.0.1:8010/mcp (也可改用 stdio 运行)
⸻
- MCP Client 调用示例
client.py
import asyncio, json
from fastmcp import Clientasync def main():# 1) 连接到 HTTP MCP Serverasync with Client("http://127.0.0.1:8010/mcp") as c:sid = "demo-session-1"# KV:写入与读取await c.call_tool("mem_put", {"session_id": sid, "key": "topic", "value": "白露"})topic = await c.call_tool("mem_get", {"session_id": sid, "key": "topic"})print("topic =", topic.text)# 时间线:追加两条,再读取最近 5 条await c.call_tool("mem_append", {"session_id": sid, "role": "user", "text": "什么是白露?"})await c.call_tool("mem_append", {"session_id": sid, "role": "agent", "text": "白露是二十四节气之一..."})recent = await c.call_tool("mem_recent", {"session_id": sid, "n": 5})print("recent =", json.loads(recent.text))# 清理# await c.call_tool("mem_clear", {"session_id": sid})asyncio.run(main())
⸻
- 在你的 Agent(A2A × FastAPI)里怎么用
在 Orchestrator 里作为“工具调用”,比如当每轮问答结束:
伪代码(在生成完 agent_answer 后)
await mcp_client.call_tool("mem_append", {"session_id": context_id, # 用 A2A 的 contextId 做会话隔离"role": "user", "text": user_query
})
await mcp_client.call_tool("mem_append", {"session_id": context_id,"role": "agent", "text": agent_answer
})
当下一轮生成前,把最近若干条短期记忆拉入上下文:
res = await mcp_client.call_tool("mem_recent", {"session_id": context_id, "n": 10})
mem_snippets = json.loads(res.text)
将这些片段拼接到系统提示或检索阶段使用
⸻
- 可选增强(生产建议)
• 会话/租户隔离:把 tenant_id 前缀进 Redis key(如 mcp:mem:{tenant}:{session})。
• 安全:给 HTTP 传输加鉴权(反代层或 fastmcp 的认证钩子);对 session_id 做白名单/格式校验。
• 容量治理:max_items、ttl_s 做上限;可增加 ESTMEM 指标、按租户限流。
• 尺寸裁剪:如果你有 token 预算限制,可把 mem_append 的 text 在入库前做“摘要/截断”(例如按字符或模型摘要)。
• 一致性与原子性:关键路径用 pipeline();需要跨键计数/配额时可用 Lua 脚本或 RedisJSON。
• 观测:为每个工具加时延/命中率指标(Prometheus),并打通 trace(请求→工具→Redis)。