AI开发 | 基于FastAPI+React的流式对话
文章目录
- 应用背景
- 技术选型
- 后端技术栈
- 前端技术栈
- 数据库
- 开发工具
- 工程结构
- 关键技术:SSE流式对话实现
- 后端实现代码
- 前端实现代码
- 快速开始
- 环境准备
- 安装步骤
- 数据库初始化
- 启动服务
- 访问应用
- 总结
应用背景
近期公司持续收到垂直领域的AI对话系统开发需求,典型场景包括法律咨询助手、外贸流程顾问等应用。为验证技术可行性并建立实施框架,本文核心目标在于实现:
-
大模型统一接入
- 标准化接口对接OpenAI、DeepSeek等主流大模型
- 实时模型热切换能力演示(运行时无中断切换)
-
高效流式对话
- 基于SSE(Server-Sent Events)的流式传输协议
- 前端动态渲染优化(逐字输出的打字机效果)
-
全周期对话管理
- 对话历史自动持久化存储(MySQL事务保障)
- 完整上下文追溯能力(历史消息即时调取)
技术选型
后端技术栈
FastAPI:高性能异步Web框架,提供自动API文档生成和高效IO处理能力
LangChain:LLM应用开发框架,提供模型抽象层和对话链管理
SQLAlchemy:Python ORM工具,支持多种数据库后端
PyMySQL:MySQL数据库驱动
Python-Jose:JWT认证实现,保障API安全
Pydantic:数据模型验证和序列化工具
前端技术栈
React:构建用户界面的主流库,支持组件化开发
Redux Toolkit:状态管理工具,简化全局状态管理
Ant Design:企业级UI组件库,提供丰富的预制组件
Axios:HTTP客户端,处理API请求
React Router:前端路由管理
数据库
MySQL 5.7+:关系型数据库,存储用户信息、对话记录等结构化数据
开发工具
Node.js 16+:前端运行环境
Python 3.8+:后端运行环境
nvm:Node.js版本管理工具
uvicorn:ASGI服务器,用于运行FastAPI应用
工程结构
llm-chat/
├── backend/ # 后端服务
│ ├── api/ # API路由
│ │ ├── admin.py # 后台管理相关接口
│ │ ├── auth.py # 认证相关接口
│ │ └── chat.py # 对话相关接口
│ ├── config/ # 配置管理
│ │ ├── config.py # 应用配置
│ │ └── database.py # 数据库配置
│ ├── models/ # 数据库模型定义
│ ├── schemas/ # Pydantic数据模型
│ ├── utils/ # 工具函数
│ │ ├── auth.py # 认证工具
│ │ └── llm.py # LLM管理工具
│ └── main.py # 应用入口
│
├── frontend/ # 前端应用
│ ├── src/
│ │ ├── components/ # 可复用UI组件
│ │ ├── pages/ # 页面组件
│ │ ├── services/ # API服务封装
│ │ ├── store/ # Redux状态管理
│ │ ├── styles/ # 样式文件
│ │ ├── App.js # 应用根组件
│ │ └── index.js # 入口文件
│ └── package.json # 依赖管理
关键技术:SSE流式对话实现
(1) 代码对接的是DeepSeek模型,刚开始按照LangChain的方式走非流式对接没有太大问题,但是直接换成流式输出时问题就比较多了,所以参考DeepSeek官方的示例(采用的是OpenAI的SDK)就调通了。
(2)注意流式SSE走的API请求应该是GET请求
(3) MVVM框架如React或者Vue使用代理时,要注意是否支持SSE。(默认是不支持的)
后端实现代码
@router.get("/stream")
async def generate(conversation_id: str = None,model_name: str=None,message: str=None,current_user: User = Depends(get_current_user),db: Session = Depends(get_db),llm_manager: LLMManager=Depends(get_llm_manager)
) -> StreamingResponse:"""流式生成回复"""try:# 获取对话历史history = []# === 1. 处理对话创建 ===#注意:如果conversation_id不存在或者为空,则创建一个新的对话if not conversation_id:# 创建新对话title = message[:50] + "..." if message else "新对话"new_conversation = Conversation(user_id=current_user.id,title=title,model_name=model_name)db.add(new_conversation)db.flush() # 立即刷新获取IDdb.commit()conversation_id = new_conversation.idelse:# 从数据库获取历史消息history_messages = (db.query(Message).filter(Message.conversation_id == conversation_id).order_by(Message.created_at).all())history = [{"role": msg.role, "content": msg.content}for msg in history_messages]#保存用户消息user_message = Message(conversation_id=conversation_id,role="user",content=message)db.add(user_message)db.commit()# 创建生成器函数async def generate_stream():async for chunk in llm_manager.generate_response(model_name=model_name,prompt=message,history=history):yield f"data: {chunk}\n\n"yield "data: [DONE]\n\n"# === 4. 通过中间件捕获流结束后保存助手消息 ===async def response_generator():"""带后处理功能的响应生成器"""# 创建临时容器存储完整回复full_response = []# 迭代流式输出async for chunk in generate_stream():full_response.append(chunk)yield chunk# 流结束后保存助手消息到数据库assistant_content = "".join(full_response).replace("data: [DONE]\n\n", "").replace("data: ", "").replace("\n\n", "")assistant_content = assistant_content.strip() # 清理多余空格# 排除结束标志后保存有效内容if assistant_content:assistant_msg = Message(conversation_id=conversation_id,role="assistant",content=assistant_content)db.add(assistant_msg)db.commit()print(f"助手消息已保存,长度: {len(assistant_content)}")return StreamingResponse(response_generator(),media_type="text/event-stream",headers={"Cache-Control": "no-cache","X-Conversation-ID": str(conversation_id) # 辅助头部})except Exception as e:raise HTTPException(status_code=500, detail=str(e))# llm_manager工具类实现的大模型对话async def generate_response(self,model_name: str,prompt: str,history: Optional[List[Dict[str, str]]] = None,system_prompt: Optional[str] = None) -> AsyncGenerator[str, None]:"""生成回复"""# 构建消息列表messages = []# 添加系统提示if system_prompt:messages.append({"role": "system", "content": system_prompt})# 添加历史消息if history:messages.extend(history)# 添加当前问题messages.append({"role": "user", "content": prompt})# 根据模型名称获取模型信息model= self.get_model(model_name)if not model:raise ValueError(f"未找到模型: {model_name}")#创建 OpenAI 客户端实例client = AsyncOpenAI(api_key=model.configuration["api_key"], base_url=model.configuration["base_url"])try:# 创建流式响应# 这里假设使用 OpenAI 的流式接口response = await client.chat.completions.create(model=model_name,messages=messages,stream=True)buffer = ""# 使用 OpenAI 的流式接口async for chunk in response:if content := chunk.choices[0].delta.content:yield content # 直接返回整个文本片段except Exception as e:logger.error(f"Error generating response: {e}")yield f"Error: {str(e)}" finally:# 确保资源清理await client.close()
前端实现代码
// 发送消息const handleSendMessage = async () => {if (!messageInput.trim()) return;// 清除之前的响应setCurrentResponse('');try {dispatch(setLoading(true));let fullResponse = '';const messageId = Date.now(); // 使用时间戳作为临时ID// 记录用户消息const userMessage = {id: messageId,role: 'user',content: messageInput,conversation_id: currentConversation?.id};dispatch(addMessage(userMessage));setMessageInput(''); // 立即清空输入框// 创建暂存的响应消息const assistantMessage = {id: messageId + 1,role: 'assistant',content: '',conversation_id: currentConversation?.id};dispatch(addMessage(assistantMessage));// 开始流式请求const stream = await chatAPI.sendMessage({//要防止undefinedconversation_id: currentConversation?.id || '',message: messageInput,model_name: selectedModel},{onStream: (content) => {// 更新助手消息的内容dispatch(addMessage({...assistantMessage,content: content}));},onHeadersReceived: (headers) => {console.log("收到响应头:", headers);const newConversationId = headers.get('X-Conversation-ID');if (newConversationId) {console.log("收到新对话ID:", newConversationId);// 1. 更新当前对话状态dispatch(setCurrentConversation({id: newConversationId,isNew:true }));}}});// 保存stream引用以便清理eventSourceRef.current = stream;} catch (err) {message.error('发送消息失败');// 清理事件源eventSourceRef.current?.close();} finally {dispatch(setLoading(false));}};
快速开始
环境准备
安装Python 3.8+
安装Node.js 16+(推荐使用nvm管理)
安装MySQL 5.7+
获取DeepSeek API密钥
安装步骤
# 克隆项目
git clone https://gitee.com/cenho/icen.git
cd icen# 后端设置
cd backend
python -m venv .venv
source .venv/bin/activate # Linux/Mac
# .\.venv\Scripts\activate # Windows
pip install -r requirements.txt
cp .env.example .env
# 编辑.env文件配置数据库和API密钥# 前端设置
cd ../frontend
npm install
数据库初始化
创建MySQL数据库
配置backend/.env中的数据库连接信息
启动服务
# 启动后端
cd backend
python run.py# 启动前端
cd ../frontend
npm start
访问应用
前端界面:http://localhost:3000
API文档:http://localhost:8000/docs
总结
本文介绍了一个基于大语言模型的通用AI对话系统的设计与实现。该系统采用现代化的技术栈(FastAPI + React),支持用户认证和实时对话功能。通过清晰的工程结构和详细的快速开始指南,开发者可以轻松部署和扩展系统。未来可扩展方向包括知识库集成等功能,进一步提升系统的智能化水平和应用场景广度。