《FastAPI零基础入门与进阶实战》第10篇:Token验证
系列文章目录
《FastAPI零基础入门与进阶实战》https://blog.csdn.net/sen_shan/category_12950843.html
第09篇:Token获取与登录APIhttps://blog.csdn.net/sen_shan/article/details/147456314?spm=1011.2415.3001.5331
文章目录
前言
API的安全性至关重要,而依赖解析Token验证则是其中的关键环节。通过验证Token,我们可以确保只有经过授权的用户才能访问特定的资源,从而保护系统的安全性和完整性。在本文中,我们将深入探讨如何在API中实现有效的Token验证机制
安全相关模块
请对 src/core/security.py 文件进行相应的修改操作,增加如下代码:
def verify_token(api_key: str, token: str):if token is None:retMes.Raise(message='Token header is missing').mes()app_id = get_app_id(api_key)try:payload = jwt.decode(token, Config.SECRET_KEY, algorithms=["HS256"])payload['app_id'] = app_idpayload['api_key'] = api_key# print(stru.timestamp_to_date(payload['exp']))payload['exp'] = stru.timestamp_to_date(payload['exp'])return payloadexcept jwt.ExpiredSignatureError:return Noneexcept jwt.InvalidTokenError:return None
功能性说明:
函数名称: `verify_token(api_key: str, token: str)`
功能描述
该函数用于验证传入的`token`是否有效,并根据验证结果返回相应的数据或处理结果。
参数说明
• `api_key`(类型:`str`):用于标识应用程序的唯一标识符。
• `token`(类型:`str`):需要验证的令牌,通常用于身份验证或授权。
主要逻辑
• 检查`token`是否为空:
• 如果`token`为`None`,则抛出一个带有消息`Token header is missing`的异常,并通过`retMes.Raise(message='Token header is missing').mes()`进行处理。
• 获取应用 ID:
• 调用`get_app_id(api_key)`函数,根据传入的`api_key`获取对应的应用程序 ID,并将其存储在变量`app_id`中。
• 解码`token`:
• 使用`jwt.decode`方法对`token`进行解码,解码时使用`Config.SECRET_KEY`作为密钥,指定算法为`HS256`。
• 如果解码成功,将解码后的`payload`中添加`app_id`和`api_key`信息。
• 将`payload`中的`exp`(过期时间)字段从时间戳转换为日期格式,调用`stru.timestamp_to_date(payload['exp'])`方法完成转换。
• 异常处理:
• 如果`token`已过期,会抛出`jwt.ExpiredSignatureError`异常,此时函数返回`None`。
• 如果`token`无效(如格式错误、签名不匹配等),会抛出`jwt.InvalidTokenError`异常,此时函数同样返回`None`。
返回值
• 如果`token`验证成功,返回包含验证信息的`payload`,其中包含:
• 原始`payload`数据。
• 新增的`app_id`和`api_key`。
• 转换后的`exp`字段(日期格式)。
• 如果`token`无效或已过期,返回`None`。
总结
该函数的主要功能是验证传入的`token`是否有效,通过解码和验证过程,确保`token`的合法性和时效性。如果验证通过,返回包含验证信息的`payload`;如果验证失败或`token`无效,则返回`None`。
验证APP Key
请对 src/core/dependencies.py 文件进行相应的修改操作,增加如下代码:
def auth_token(api_key: str = Header(None, alias="x-api-key"), token: str = Header(None, alias="Token")):# 为了接口命名规范,把API_KEY改为 x-api-key 2025/04/23payload = verify_token(api_key, token)if not payload:retMes.Raise(message='Invalid or expired JWT Token').mes()# raise HTTPException(status_code=401, detail="Invalid or expired JWT Token")return payload
功能说明:
这段代码的功能说明如下:
函数名称:`auth_token(api_key: str = Header(None, alias="x-api-key"), token: str = Header(None, alias="Token"))`
功能描述
该函数用于验证 HTTP 请求头中的`x-api-key`和`Token`,并根据验证结果返回相应的数据或抛出异常。
参数说明
• `api_key`(类型:`str`,默认值:`Header(None, alias="x-api-key")`):
• 表示请求头中的`x-api-key`字段,用于标识应用程序的唯一标识符。
• 使用`Header`来从请求头中提取该字段,如果不存在则为`None`。
• `token`(类型:`str`,默认值:`Header(None, alias="Token")`):
• 表示请求头中的`Token`字段,用于身份验证或授权。
• 使用`Header`来从请求头中提取该字段,如果不存在则为`None`。
主要逻辑
• 调用`verify_token`函数:
• 使用传入的`api_key`和`token`调用`verify_token`函数,获取验证结果`payload`。
• `verify_token`函数的作用是验证`token`是否有效,返回验证通过的`payload`或`None`。
• 验证结果处理:
• 如果`payload`为`None`,说明`token`无效或已过期。
• 此时调用`retMes.Raise(message='Invalid or expired JWT Token').mes()`抛出异常,提示无效或过期的 JWT Token。
• 注释中提到的`raise HTTPException(status_code=401, detail="Invalid or expired JWT Token")`是另一种异常处理方式,但被注释掉了。
• 返回结果:
• 如果`payload`验证成功,返回该`payload`。
返回值
• 如果`token`验证成功,返回包含验证信息的`payload`。
• 如果`token`无效或已过期,抛出异常。
总结
该函数的主要功能是从 HTTP 请求头中提取`x-api-key`和`Token`,并调用`verify_token`函数进行验证。如果验证通过,返回验证结果;如果验证失败或`token`无效/过期,则抛出异常。
在本次版本更新里, API_KEY 已被重新规范为 x-api-key ,以确保接口命名的一致性。
在第09篇中, dependencies.py 文件中的 auth_api_key 已根据最新规范进行了调整。调整后的完整代码如下:
"""
验证APP Key和Token
"""
# src/core/dependencies.py
from fastapi import Depends, HTTPException, Header
from src.core.security import verify_token, get_app_id
from src.core import retMes
from fastapi import Requestdef auth_api_key(api_key: str = Header(None, alias="x-api-key")):# 为了接口命名规范,把API_KEY改为 x-api-key 2025/04/23# 根据 APP KEY 抓取项目配置APP ID request: Request# api_key = api_key api_key: str = Header(None, alias="API_KEY")# api_key: str = Header(...) 无法获取,通过 request.headers 获取"""headers = request.headersprint(headers)api_key = headers.get('api-key')if api_key is None:retMes.Raise(message='API_KEY header is missing').mes()"""app_id = get_app_id(api_key)return {"app_id": app_id,"api_key": api_key}def auth_token(api_key: str = Header(None, alias="x-api-key"), token: str = Header(None, alias="Token")):# 为了接口命名规范,把API_KEY改为 x-api-key 2025/04/23payload = verify_token(api_key, token)if not payload:retMes.Raise(message='Invalid or expired JWT Token').mes()# raise HTTPException(status_code=401, detail="Invalid or expired JWT Token")return payload
Main.py调整
请对 main.py 文件进行相应的修改操作,增加如下代码:
@app.get("/items")
async def read_items(auth: dict = Depends(dependencies.auth_token)):return auth
验证
获取Token
打开“apipost”新建一个POST接口
地址栏输入:http://127.0.0.1:8080/login
在header中新增 x-api-key类型为string,数值为:Test
在body中选择raw类型为json,输入语句如下:
{"username": "admin","password": "admin"
}
发送结果如下:
{"status_code": 200,"status": "success","message": "Operation successful.","data": {"api_key": "Test","username": "admin","token_key": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoicTEyMzQ1NjdBYSIsInVzZXJuYW1lIjoiYWRtaW4iLCJyb2xlIjoiYWRtaW4iLCJlbWFpbCI6InVzZXJAZXhhbXBsZS5jb20iLCJleHAiOjE3NDYwNjU2MTN9.WdkudBmGqi4v-E46agGXxNV7Zxrc27PCEm-HNEDKYUE","role": "admin","email": "user@example.com"}
}
验证Token
在“apipost”新建一个get接口
地址栏输入:http://127.0.0.1:8080/items
在header中新增:
x-api-key类型为string,数值为:Test
Token类型为string,数值为:为login获取的token_key的值
如上面结果中,具体请根据实际拷贝:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoicTEyMzQ1NjdBYSIsInVzZXJuYW1lIjoiYWRtaW4iLCJyb2xlIjoiYWRtaW4iLCJlbWFpbCI6InVzZXJAZXhhbXBsZS5jb20iLCJleHAiOjE3NDYwNjU2MTN9.WdkudBmGqi4v-E46agGXxNV7Zxrc27PCEm-HNEDKYUE
点击发送后结果如下:
{"user_id": "q1234567Aa","username": "admin","role": "admin","email": "user@example.com","exp": "2025-04-30 17:27:26","app_id": "2222","api_key": "Test"
}
错误Token验证
在上述验证基础上,将 Token 替换为 “test” 等非正确数据进行测试,验证结果:
{"detail": "Invalid or expired JWT Token"
}