FastAPI系列14:API限流与暴力破解防护
API限流与暴力破解防护
- 1、暴力破解防护策略
- 2、接口限流(Rate Limiting)
- 3、分布式限流
- 4、按用户 Token 限流
- 5、验证码
在 FastAPI系列12:使用JWT 登录认证和RBAC 权限控制 和 FastAPI系列13:API的安全防护 两节中,我们讨论了FastAPI中基本的安全防护技术,本节我们继续讨论API限流与暴力破解的防护。
1、暴力破解防护策略
“暴力破解”是一种常见的攻击手段,在针对web api攻击中,一般会使用这种手段对应用系统的认证信息进行获取。 其过程就是使用大量的认证信息在认证接口进行尝试登录,直到得到正确的结果。
一般情况下,针对“暴力破解”攻击,我们可以采用以下策略:
-
限流(Rate Limiting)
限制每个 IP、用户、接口单位时间内的访问频率,例如:登录接口每分钟最多 5 次。 -
验证码(Captcha)
登录失败次数多时要求验证,防止机器人攻击。
-
登录失败锁定机制
多次失败后暂时冻结账户或 IP。
2、接口限流(Rate Limiting)
在 FastAPI 中实现接口限流(Rate Limiting),常用方法是使用 slowapi 这个库,它基于 Starlette 和 Redis。以下是一个完整的实现过程:
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded# 创建限流器
limiter = Limiter(key_func=get_remote_address)# 创建应用
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)# 应用限流装饰器(例如每分钟最多访问 5 次)
@app.get("/limited")
@limiter.limit("5/minute")
def limited_endpoint(request: Request):return {"message": "This endpoint is rate-limited"}
限流策略语法
- “1/second”:每秒 1 次
- “5/minute”:每分钟 5 次
- “100/hour”:每小时 100 次
3、分布式限流
在上面示例的基础上,我们可以进一步配合使用Redis 实现更大规模的分布式限流。
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from redis import Redis# 初始化 Redis 连接
redis_client = Redis(host='localhost', port=6379, db=0)# 初始化限流器,使用 Redis 存储
limiter = Limiter(key_func=get_remote_address, storage_uri="redis://localhost:6379")# 创建 FastAPI 应用
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)# 设置限流:同一 IP 每分钟最多访问 10 次
@app.get("/api")
@limiter.limit("10/minute")
def limited_api(request: Request):return {"message": "You are within the rate limit."}
4、按用户 Token 限流
在 FastAPI 中按用户 Token(如 JWT)限流,可以自定义 key_func,提取请求头中的用户标识作为限流键。以下是实现示例:
from fastapi import FastAPI, Request, Header, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from starlette.responses import JSONResponse
import jwt # 使用 PyJWT# 模拟解析 JWT(请替换为你自己的解析逻辑)
SECRET = "your-secret-key"
def get_user_id_from_token(authorization: str):try:token = authorization.split(" ")[1]payload = jwt.decode(token, SECRET, algorithms=["HS256"])return payload["sub"] # 假设用户 ID 存在 sub 字段中except Exception:return "anonymous" # 未登录用户或异常,统一使用默认限流键# 自定义限流 key 函数
def token_key_func(request: Request):auth = request.headers.get("Authorization")return get_user_id_from_token(auth) if auth else get_remote_address(request)# 初始化限流器
limiter = Limiter(key_func=token_key_func, storage_uri="redis://localhost:6379")# 初始化应用
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)# 接口限流(按用户 ID)
@app.get("/user-api")
@limiter.limit("5/minute")
def user_api(request: Request):return {"message": "You are within your user-specific rate limit."}
上面的示例可以实现:
- 每个用户(Token)单独限流;
- 未登录用户使用 IP 地址限流;
- 支持 JWT、Session 或其他认证方式。
5、验证码
在 FastAPI 中可以使用captcha、Pillow实现图形验证码,以结合上面提到的限流技术实现对接口的暴力破解防护。当然要实现更大规模的防护能力时,往往需要借助第三方专业平台。
以下为利用captcha
库实现的图形验证码:
from fastapi import FastAPI, Response
from captcha.image import ImageCaptcha
import random
import string
from io import BytesIOapp = FastAPI()@app.get("/captcha")
def get_captcha():image = ImageCaptcha(width=160, height=60)code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))data = image.generate(code)# 存入 Redis 或 Session,供后续验证# redis.setex("captcha:client_ip", 300, code.lower())return Response(content=data.read(), media_type="image/png")
在实际使用中,我们需要把验证码文本(如 ABCD1)保存在服务端(如 Redis),并在登录时比对。