利用 FastAPI 实现三种推送方式
利用 FastAPI 实现三种推送方式
在上一篇博客《通过命令行实现日志监听、抽取并发送到企业微信机器人》 里,我用纯 Shell 命令实现了日志监听并推送到企业微信的功能。尽管方法简单,但我还是颇为得意,毕竟没写一行代码,仅依靠命令和管道就实现了一个小机器人功能。后续我可能还会发表一两篇用类似方式实现特定功能的文章,不过今天要聊的是另一件事。起初,我没打算把日志监听结果推送到企业微信,而是想单独创建一个页面,实现简单的发布 - 订阅功能。目前,我已经完成了以下三种推送方式:
- HTTP 轮询
- WebSocket 通知
- SSE 服务端推送
尽管在编程过程中我借助了 AI 工具,但整体思路(例如引入 channel 概念、创建推送队列)是我自己构思的,同时还对一些细节进行了修改,并考虑了技术实现上的限制(比如不引入更多中间件,仅用简单的 HTML 实现页面功能,用
defaultdict
替代原生的dict
)。
1. HTTP 轮询
from fastapi import Body, FastAPI
from collections import defaultdict
from fastapi.responses import StreamingResponse, HTMLResponse
import asyncio
import jsonapp = FastAPI()channels = defaultdict(list)@app.post("/pub/{channel}")
async def push(channel: str, req: str = Body(...)):channels[channel].append(req)return req@app.get("/sub/{channel}", response_class=HTMLResponse)
async def pub(channel: str):messages = channels.get(channel, [])html_content = f"""<html><head><title>Channel: {channel}</title><meta http-equiv="refresh" content="10"><style>body {{ font-family: Arial, sans-serif; margin: 20px; }}h1 {{ color: #333; }}ul {{ list-style-type: none; padding: 0; }}li {{ padding: 10px; margin: 5px 0; background-color: #f0f0f0; border-radius: 5px; }}.empty {{ color: #666; font-style: italic; }}</style></head><body><h1>通道: {channel}</h1>{'<ul>' + ''.join([f'<li>{msg}</li>' for msg in messages]) + '</ul>' if messages else '<p class="empty">No messages in this channel</p>'}</body></html>"""return html_content
在这个小功能里,我既没有也不想创建新页面,更不想用 Ajax 操作实现花哨的效果。对我而言,这并非一个正式产品,只是一个临时的、让自己省事的过渡方案。要是以后有需要实时变动并发送通知的场景,这个小文件应该就足够了。当然,利用浏览器自带的刷新策略,估计也只有像我这样上了年纪的人才会第一时间想到。
测试命令
curl http://127.0.0.1:8000/pub/ch1 -d "hello"
2. WebSocket
from fastapi import FastAPI, HTTPException, WebSocket, Body
from fastapi.responses import JSONResponse
import json
import ssl
import uvicorn
import asyncio
from datetime import datetime
from collections import defaultdict
from typing import Dict, Set, AsyncIterator
import weakrefapp = FastAPI()class MessageBroker:def __init__(self):# 使用字典存储每个 channel 的订阅者self.subscribers: Dict[str, Set[asyncio.Queue]] = defaultdict(set)async def publish(self, channel: str, message: str):if channel in self.subscribers:# 为每个订阅者的队列添加消息for queue in self.subscribers[channel].copy():await queue.put(message)def subscribe(self, channel: str) -> asyncio.Queue:# 为新订阅者创建一个消息队列queue = asyncio.Queue()self.subscribers[channel].add(queue)return queuedef unsubscribe(self, channel: str, queue: asyncio.Queue):# 移除订阅者if channel in self.subscribers:self.subscribers[channel].discard(queue)if not self.subscribers[channel]:del self.subscribers[channel]# 创建全局消息代理实例
broker = MessageBroker()@app.post("/pub/{channel}")
async def publish_message(channel: str, message: str = Body(media_type="text/plain")):try:# 直接发布原始文本消息await broker.publish(channel, message)return JSONResponse(status_code=200,content={"status": "success", "message": "Message published successfully"})except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.websocket("/sub/{channel}")
async def subscribe_message(websocket: WebSocket, channel: str):await websocket.accept()# 订阅指定 channelqueue = broker.subscribe(channel)try:while True:# 等待并获取消息message = await queue.get()await websocket.send_text(message)except Exception as e:print(f"Error: {e}")finally:# 清理订阅broker.unsubscribe(channel, queue)await websocket.close()
WebSocket 这个实现是个意外之喜。原本我想让代码辅助工具帮我实现一个 SSE 版本,结果它给出了一个 WebSocket 版本。
3. SSE
from fastapi import FastAPI, Body
from sse_starlette.sse import EventSourceResponse
import asyncio
from collections import defaultdictapp = FastAPI()# 使用 defaultdict 自动创建新的 channel 队列
channels = defaultdict(asyncio.Queue)@app.post("/pub/{channel}")
async def publish_message(channel: str, message: str = Body(...)):# 直接发布消息到队列,如果 channel 不存在会自动创建await channels[channel].put(message)return {"status": f"Message published to channel {channel}"}@app.get("/sub/{channel}")
async def subscribe(channel: str):async def event_generator():while True:# 直接从队列获取消息,如果 channel 不存在会自动创建message = await channels[channel].get()yield {"event": "message","data": message}return EventSourceResponse(event_generator())
虽然 SSE 方式会出现消息遗漏问题,但鉴于代码如此简洁,我也就不做太高要求了。