当前位置: 首页 > ds >正文

FastAPI + SQLAlchemy (异步版)连接数据库时,对数据进行加密

简介:此部分内容为,在FastAPI + SQLAlchemy (异步版)连接数据库时,需要对保存在数据库中的API_KEY 以及用户密码进行加密时所著:

一、AES-GCM对称加密步骤

1.特点 :

  • 单一密钥:加密和解密使用相同的密钥(对称加密),需通过安全方式(如环境变量、密钥管理系统)存储和传输密钥。
  • 密钥长度:支持 128、192、256 位密钥(推荐 256 位以获得最高安全性)。

2.加解密步骤: 

1.生成密钥:

密钥一般存在环境变量中,使用安全的随机数生成器(如 Python 的 os.urandom):

generate_aes_gcm_key.py:

import os
import base64# 1. 生成 AES-256 密钥
def generate_aes_gcm_key(key_size: int = 32) -> bytes:"""生成指定长度的 AES-GCM 密钥(字节串)。:param key_size: 16(AES-128)、24(AES-192)、32(AES-256):return: 密钥(bytes)"""if key_size not in (16, 24, 32):raise ValueError("Key size must be 16, 24, or 32 bytes")return os.urandom(key_size)# 2. 将密钥转换为 Base64 字符串(便于存储到环境变量)
def b64encode_generated_key(generated_key: bytes) ->str:key_base64 = base64.b64encode(generated_key).decode()# print("Base64 编码的密钥:", key_base64)return key_base64# 只运行一次,确保全流程中密钥统一
if __name__ == "__main__":generated_key = generate_aes_gcm_key()key_base64 = b64encode_generated_key(generated_key)print("编码后的密钥:",key_base64) # 需要手动保存到环境变量中(.env)

2.将generate_aes_gcm_key.py中生成的密钥 AES_KEY 手动复制添加到 .env文件中:

3.在 config.py 文件中加载环境变量

from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os# 加载环境变量(仅本地开发)
load_dotenv()class DifySetting(BaseSettings):MYSQL_HOST: strMYSQL_PORT: int = 3306MYSQL_USER: strMYSQL_PASSWORD: strMYSQL_NAME: strAPP_ENV: str = "dev"AES_KEY: str  # AES 密钥(Base64 编码)AES_GCM_NONCE_SIZE: int = 12  # 注意,环境变量中存在的值,在DifySetting这个类中也必须包含class Config:env_file =".env"env_file_encoding = "utf-8"# 全局 AES-GCM 实例
dify_settings = DifySetting()

4.编写 aes_gcm_security.py 加密、解密函数:

import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from app.core.config import dify_settings  # 假设这是你的配置模块(包含 AES_KEY)
import os
from cryptography.exceptions import InvalidTag# 1. 初始化 AESGCM 对象
def load_aes_key() -> AESGCM:key_bytes = base64.b64decode(dify_settings.AES_KEY)  # Base64 → bytesreturn AESGCM(key_bytes)  # 创建 AESGCM 实例aesgcm = load_aes_key()# 2. 加密函数 - 返回单个组合字符串
def encrypt_aes_gcm_combined(plaintext: str) -> str:"""使用 AES-GCM 加密明文字符串,返回组合字符串(IV+密文的Base64编码):param plaintext: 明文(字符串):return: Base64编码的字符串(前16字符为IV,后面为密文)"""plaintext_bytes = plaintext.encode("utf-8")iv = os.urandom(12)  # 生成12字节随机IVciphertext = aesgcm.encrypt(iv, plaintext_bytes, None)# 拼接IV和密文后整体进行Base64编码combined = iv + ciphertextreturn base64.b64encode(combined).decode("utf-8")# 3. 解密函数 - 从组合字符串解密
def decrypt_aes_gcm_combined(combined_base64: str) -> str:"""从组合字符串解密出原始明文:param combined_base64: Base64编码的组合字符串(IV+密文):return: 明文(字符串)"""try:combined = base64.b64decode(combined_base64)iv = combined[:12]  # 前12字节为IVciphertext = combined[12:]  # 剩余部分为密文plaintext_bytes = aesgcm.decrypt(iv, ciphertext, None)return plaintext_bytes.decode("utf-8")except InvalidTag:raise ValueError("解密失败:认证标签无效(密钥或数据损坏)")except Exception as e:raise ValueError(f"解密失败:{str(e)}")

 5. 在编写 FastAPI 时调用加密、解密函数;

from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import select, exc
from app.database.database import db_dependency
from app.models.dify_models_ORM import Agent
# 导入优化后的加密函数
from app.core.aes_gcm_security import encrypt_aes_gcm_combined, decrypt_aes_gcm_combineddify_router = APIRouter()# 定义 Pydantic 模型
class AgentResponse(BaseModel):id: intagent_name: stragent_describe: stragent_url: stragent_Content_Type: stragent_api_key: str  # 返回解密后的API Keyuser: strcreated_at: datetimeclass Config:from_attributes = Trueclass CreateAgentRequest(BaseModel):agent_name: stragent_describe: strurl: strapi_key: str  # 接收明文API Keycontent_type: str = "application/json"user: str# 创建 Agent - 使用优化后的加密方法
@dify_router.post("/dify_agents", status_code=status.HTTP_201_CREATED)
async def create_agent(request: CreateAgentRequest, db: db_dependency):try:# 使用新的组合加密方法combined_ciphertext = encrypt_aes_gcm_combined(request.api_key)db_agent = Agent(agent_name=request.agent_name,agent_describe=request.agent_describe,agent_url=request.url,agent_api_key=combined_ciphertext,  # 存储组合密文agent_Content_Type=request.content_type,user=request.user,created_at=datetime.utcnow())db.add(db_agent)await db.commit()await db.refresh(db_agent)return {"agent_id": db_agent.id}except exc.IntegrityError:await db.rollback()raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="API Key 已存在")except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询单个 Agent - 使用优化后的解密方法
@dify_router.get("/dify_agents/{agent_id}", status_code=status.HTTP_200_OK)
async def read_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalars().first()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")# 解密逻辑 - 使用新的组合解密方法try:decrypted_key = decrypt_aes_gcm_combined(agent.agent_api_key)# 创建代理对象副本,避免直接修改ORM对象,即返回为此副本中数据agent_data = {"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"agent_api_key": decrypted_key,  # 使用解密后的密钥"user": agent.user,"created_at": agent.created_at}return AgentResponse(**agent_data)except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail=f"解密失败: {str(e)}")except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询所有 Agents - 不返回敏感API Key
@dify_router.get("/dify_agents", status_code=status.HTTP_200_OK)
async def read_agents(db: db_dependency):try:result = await db.execute(select(Agent))agents = result.scalars().all()# 返回不包含敏感API Key的数据safe_agents = []for agent in agents:safe_agents.append({"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"user": agent.user,"created_at": agent.created_at})return safe_agentsexcept Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 删除 Agent - 保持不变
@dify_router.delete("/dify_agents/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalar()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")await db.delete(agent)await db.commit()except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))return None

二、密码哈希加密用户密码:

1.特点:

  • 密码哈希算法是​​单向的​​(不可逆),专门为存储密码(等无需还原原来明文信息所设计);
  • 抗暴力破解​​:通过加盐(Salt)和多次迭代(Work Factor)增加计算成本;

2.加密步骤: 

http://www.xdnf.cn/news/15325.html

相关文章:

  • 【字节跳动】数据挖掘面试题0016:解释AUC的定义,它解决了什么问题,优缺点是什么,并说出工业界如何计算AUC。
  • UE5多人MOBA+GAS 18、用对象池来设置小兵的队伍的生成,为小兵设置一个目标从己方出生点攻打对方出生点,优化小兵的血条UI
  • (补充)RS422
  • 【每日刷题】x 的平方根
  • 2D下的几何变换(C#实现,持续更新)
  • Elasticsearch混合搜索深度解析(下):执行机制与完整流程
  • 【AI News | 20250710】每日AI进展
  • 2025年DevSecOps工具全景图:安全左移时代的国产化突围
  • 深入探索Kafka Streams:企业级实时数据处理实践指南
  • 11. TCP 滑动窗口、拥塞控制是什么,有什么区别
  • 8-day06预训练模型
  • 揭示张量分析的强大力量:高级研究的基础-AI云计算拓展核心内容
  • Django老年健康问诊系统 计算机毕业设计源码32407
  • 从就绪到终止:操作系统进程状态转换指南
  • 将手工建模模型(fbx、obj)转换为3dtiles的免费工具!
  • 上半年净利预增66%-97%,高增长的赛力斯该咋看?
  • 聊一聊在 Spring Boot 项目中自定义 Validation 注解
  • 牛客小白月赛119
  • 进程状态 + 进程优先级切换调度-进程概念(5)
  • 【C++篇】二叉树进阶(上篇):二叉搜索树
  • Qt中QGraphicsView类应用解析:构建高效2D图形界面的核心技术
  • 数据结构-顺序表
  • 【C语言网络编程】HTTP 客户端请求(域名解析过程)
  • Oracle字符类型详解:VARCHAR、VARCHAR2与CHAR的区别
  • Qt数据库编程详解:SQLite实战指南
  • 解决Linux绑定失败地址已使用(端口被占用)的问题
  • 设计仿真 | MSC Apex Simufact实现铁路铰链轻量化与高精度增材制造
  • 在 Spring Boot 中优化长轮询(Long Polling)连接频繁建立销毁问题
  • MySQL:分析表锁的常见问题
  • JavaScript加强篇——第四章 日期对象与DOM节点(基础)