深度剖析:Dify+Sanic+Vue+ECharts 搭建 Text2SQL 项目 sanic-web 的 Debug 实战
目录
- 项目背景介绍
- sanic-web
- Dify\_service handle\_think\_tag报错NoneType
- 问题描述
- debug
- Dify调用不成功,一直转圈圈
- 问题描述
- debug
- 前端markdown格式只显示前5页
- 问题描述
- debug
- 1. 修改代码
- 2.重新构建1.1.3镜像
- 3.更新sanic-web/docker/docker-compose.yaml
- 4. 重新部署
- Dify超时60秒,服务器报错
- 问题描述
- debug
项目背景介绍
sanic-web
项目地址:https://github.com/apconw/sanic-web
一个轻量级、支持全链路且易于二次开发的大模型应用项目(Large Model Data Assistant) 支持DeepSeek/Qwen2.5等大模型 基于 Dify 、Ollama&Vllm、Sanic 和 Text2SQL 📊 等技术构建的一站式大模型应用开发项目,采用 Vue3、TypeScript 和 Vite 5 打造现代UI。它支持通过 ECharts 📈 实现基于大模型的数据图形化问答,具备处理 CSV 文件 📂 表格问答的能力。同时,能方便对接第三方开源 RAG 系统 检索系统 🌐等,以支持广泛的通用知识问答。
这个项目可以作为text2sql的经典案例,通过自然语言来访问业务数据库,最终使用echarts图表可视化展示分析数据。使用了独立开发的web页面,对于前端小伙伴来说也比较友好,这完全可以作为一个AI智能助手的Demo实现。
Dify_service handle_think_tag报错NoneType
问题描述
debug
修改services/dify_service.py/handle_think_tag代码,如下:
@staticmethodasync def handle_think_tag(answer):"""处理<think>标签内的内容:param answer""""""处理<think>标签内的内容,或JSON格式的thoughts字段:param answer"""think_content = ""remaining_content = answer# 会遇到answer可能不能解析到,先尝试解析为JSONtry:data = json.loads(answer)if isinstance(data, dict) and "thoughts" in data:think_content = data["thoughts"]remaining_content = answerreturn think_content, remaining_contentexcept Exception:pass# 再尝试正则提取<think>标签match = re.search(r"<think>(.*?)</think>", answer, re.DOTALL)if match:think_content = match.group(1)remaining_content = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()return think_content, remaining_content# 如果都没有,返回空return "", answer
Dify调用不成功,一直转圈圈
问题描述
debug
检查本地dify的端口号,修改sanic-web/docker/docker-compose.yaml中dify端口号到本地端口号,比如原端口号是18000,修改成80。
chat-service:image: apconw/sanic-web:1.1.2container_name: sanic-webenvironment:- ENV=test- DIFY_SERVER_URL=http://host.docker.internal:80- DIFY_DATABASE_QA_API_KEY=app-AXDUw8TtcY7N6TMGHkPaC4VF- MINIO_ENDPOINT=host.docker.internal:19000- MINIO_ACCESS_KEY=sIR5eeDkiwoo779yNJbw- MiNIO_SECRET_KEY=MreuQ3aC1ymHJeo3QfzSg7aPz7PqlxeOw39nZUdEports:- "8088:8088"extra_hosts:- "host.docker.internal:host-gateway"
前端markdown格式只显示前5页
问题描述
debug
1. 修改代码
修改web/src/components/MarkdownPreview/MarkdownTable.vue第39行-40行代码,将:data="pagedTableData"改为:data=“tableData”,并移除:pagination="pagination"属性:
<template><div style="background-color: #ffffff"><n-cardtitle="表格"embeddedbordered:content-style="{ 'background-color': '#ffffff' }":header-style="{color: '#26244c',height: '10px','background-color': '#f0effe','text-align': 'left','font-size': '14px','font-family': 'PMingLiU'}":footer-style="{color: '#666','background-color': '#ffffff','text-align': 'left','font-size': '14px','font-family': 'PMingLiU'}"><divstyle="display: flex;justify-content: space-between;margin-bottom: 10px;"></div><n-data-tablestyle="height: 550px;width: 850px;margin: 0px 10px;background-color: #ffffff;":columns="columns":data="tableData":max-height="550"virtual-scrollvirtual-scroll-x:scroll-x="scrollX":min-row-height="minRowHeight":height-for-row="heightForRow"virtual-scroll-header:header-height="48"/><template #footer>数据来源: 大模型生成的数据, 以上信息仅供参考</template></n-card></div>
</template>
2.重新构建1.1.3镜像
# 进入web的docker目录
cd docker
# 查看原始Dockerfile ,这步也可以省略
cat Dockerfile
# 使用原始Dockerfile构建新镜像
docker build -t apconw/chat-vue3-mvp:1.1.3 -f Dockerfile ..
3.更新sanic-web/docker/docker-compose.yaml
services:chat-web:image: apconw/chat-vue3-mvp:1.1.3 # 更新为新版本container_name: chat-vue3-mvpports:- "8081:80"extra_hosts:- "host.docker.internal:host-gateway"depends_on:- chat-service
4. 重新部署
docker-compose down
docker-compose up -d
Dify超时60秒,服务器报错
问题描述
2025/05/09 08:16:19 [error] 20#20: *1 upstream timed out (110: Operation timed out) while reading response header from upstream, client: 192.168.65.1, server: localhost, request: “POST /sanic/dify/get_answer HTTP/1.1”, upstream: “http://192.168.65.254:8088/dify/get_answer”, host: “localhost:8081”, referrer: “http://localhost:8081/chat”
192.168.65.1 - - [09/May/2025:08:16:19 +0000] “POST /sanic/dify/get_answer HTTP/1.1” 504 497 “http://localhost:8081/chat” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36” “-”
192.168.65.1 - - [09/May/2025:08:16:19 +0000] “POST /sanic/dify/get_dify_suggested HTTP/1.1” 200 63 “http://localhost:8081/chat” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36” “-”
debug
修改dify_service.py中的DiFyRequest,添加心跳机制:
- 添加心跳机制:
- 新增send_heartbeat方法发送SSE心跳
- 在处理请求过程中每10秒发送一次心跳
- 在开始、结束和错误处理时也发送心跳
- 增强错误处理:
- 添加send_error_message方法,统一错误信息发送
- 完善异常处理,确保错误信息能发送到客户端
- 增加超时时间:
- 将aiohttp的超时设置从2分钟增加到5分钟
- 确保连接正确关闭:
- 在finally块中确保发送最后的心跳消息
- 确保调用res_end方法正确关闭连接
class DiFyRequest:"""DiFy操作服务类"""def __init__(self):passasync def exec_query(self, res):"""执行查询并处理流式响应"""try:# 获取请求体内容 从res流对象获取request-bodyreq_body_content = res.request.body# 将字节流解码为字符串body_str = req_body_content.decode("utf-8")req_obj = json.loads(body_str)logging.info(f"query param: {body_str}")# str(uuid.uuid4())chat_id = req_obj.get("chat_id")qa_type = req_obj.get("qa_type")# 使用正则表达式移除所有空白字符(包括空格、制表符、换行符等)query = req_obj.get("query")cleaned_query = re.sub(r"\s+", "", query)# 获取登录用户信息token = res.request.headers.get("Authorization")if not token:raise MyException(SysCodeEnum.c_401)if token.startswith("Bearer "):token = token.split(" ")[1]# 封装问答上下文信息qa_context = QaContext(token, cleaned_query, chat_id)# 判断请求类别app_key = self._get_authorization_token(qa_type)# 构建请求参数dify_service_url, body_params, headers = self._build_request(chat_id, cleaned_query, app_key, qa_type)# 收集流式输出结果t02_answer_data = []# 收集业务数据流式输出结果t04_answer_data = {}# 发送初始连接消息await self.send_heartbeat(res, "开始处理请求")# 心跳计时器last_heartbeat = time.time()async with aiohttp.ClientSession(read_bufsize=1024 * 16) as session:async with session.post(dify_service_url,headers=headers,json=body_params,timeout=aiohttp.ClientTimeout(total=60 * 5), # 增加到5分钟超时) as response:logging.info(f"dify response status: {response.status}")if response.status == 200:data_type = ""bus_data = ""while True:# 发送心跳保持连接current_time = time.time()if current_time - last_heartbeat > 10: # 每10秒发送一次心跳await self.send_heartbeat(res, "处理中...")last_heartbeat = current_timereader = response.contentreader._high_water = 10 * 1024 * 1024 # 设置为10MBchunk = await reader.readline()if not chunk:# 发送最后的心跳await self.send_heartbeat(res, "读取数据完成")breakstr_chunk = chunk.decode("utf-8")# 处理数据块if str_chunk.startswith("data"):# 更新最后心跳时间last_heartbeat = time.time()str_data = str_chunk[5:]data_json = json.loads(str_data)event_name = data_json.get("event")conversation_id = data_json.get("conversation_id")message_id = data_json.get("message_id")task_id = data_json.get("task_id")# 处理消息事件...# 这里保留原有的事件处理逻辑if DiFyCodeEnum.MESSAGE.value[0] == event_name:answer = data_json.get("answer")if answer and answer.startswith("dify_"):event_list = answer.split("_")if event_list[1] == "0":# 输出开始data_type = event_list[2]if data_type == DataTypeEnum.ANSWER.value[0]:await self.send_message(res,answer,{"data": {"messageType": "begin"}, "dataType": data_type},)elif event_list[1] == "1":# 输出结束data_type = event_list[2]if data_type == DataTypeEnum.ANSWER.value[0]:await self.send_message(res,answer,{"data": {"messageType": "end"}, "dataType": data_type},)# 输出业务数据elif bus_data and data_type == DataTypeEnum.BUS_DATA.value[0]:res_data = process(json.loads(bus_data)["data"])await self.send_message(res,answer,{"data": res_data, "dataType": data_type},)t04_answer_data = {"data": res_data, "dataType": data_type}data_type = ""elif len(data_type) > 0:# 这里输出 t02之间的内容if data_type == DataTypeEnum.ANSWER.value[0]:await self.send_message(res,answer,{"data": {"messageType": "continue", "content": answer}, "dataType": data_type},)t02_answer_data.append(answer)# 这里设置业务数据if data_type == DataTypeEnum.BUS_DATA.value[0]:bus_data = answerelif DiFyCodeEnum.MESSAGE_ERROR.value[0] == event_name:# 输出异常情况日志error_msg = data_json.get("message")logging.error(f"Error 调用dify失败错误信息: {data_json}")await res.write("data:"+ json.dumps({"data": {"messageType": "error", "content": "调用失败请查看dify日志,错误信息: " + error_msg},"dataType": DataTypeEnum.ANSWER.value[0],},ensure_ascii=False,)+ "\n\n")elif DiFyCodeEnum.MESSAGE_END.value[0] == event_name:t02_message_json = {"data": {"messageType": "continue", "content": "".join(t02_answer_data)},"dataType": DataTypeEnum.ANSWER.value[0],}print(t02_message_json)if t02_message_json:await self._save_message(t02_message_json, qa_context, conversation_id, message_id, task_id, qa_type)if t04_answer_data:await self._save_message(t04_answer_data, qa_context, conversation_id, message_id, task_id, qa_type)t02_answer_data = []t04_answer_data = {}except Exception as e:logging.error(f"Error during get_answer: {e}")traceback.print_exception(e)# 发送错误信息await self.send_error_message(res, f"处理请求出错: {str(e)}")return {"error": str(e)} # 返回错误信息作为字典finally:# 确保连接正确关闭await self.send_heartbeat(res, "请求处理完成")await self.res_end(res)async def send_heartbeat(self, res, message="心跳"):"""发送心跳信息保持连接活跃"""try:await res.write(f"data:{json.dumps({'heartbeat': True, 'message': message}, ensure_ascii=False)}\n\n")except Exception as e:logging.error(f"发送心跳失败: {e}")async def send_error_message(self, res, error_message):"""发送错误信息"""try:await res.write("data:"+ json.dumps({"data": {"messageType": "error", "content": error_message},"dataType": DataTypeEnum.ANSWER.value[0],},ensure_ascii=False,)+ "\n\n")except Exception as e:logging.error(f"发送错误信息失败: {e}")@staticmethodasync def handle_think_tag(answer):"""处理<think>标签内的内容:param answer""""""处理<think>标签内的内容,或JSON格式的thoughts字段:param answer"""think_content = ""remaining_content = answer# 会遇到answer可能不能解析到,先尝试解析为JSONtry:data = json.loads(answer)if isinstance(data, dict) and "thoughts" in data:think_content = data["thoughts"]remaining_content = answerreturn think_content, remaining_contentexcept Exception:pass# 再尝试正则提取<think>标签match = re.search(r"<think>(.*?)</think>", answer, re.DOTALL)if match:think_content = match.group(1)remaining_content = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()return think_content, remaining_content# 如果都没有,返回空return "", answer@staticmethodasync def _save_message(message, qa_context, conversation_id, message_id, task_id, qa_type):"""保存消息记录并发送SSE数据:param message::param qa_context::param conversation_id::param message_id::param task_id::param qa_type::return:"""# 保存用户问答记录 1.保存用户问题 2.保存用户答案 t02 和 t04if "content" in message["data"]:await add_question_record(qa_context.token, conversation_id, message_id, task_id, qa_context.chat_id, qa_context.question, message, "", qa_type)elif message["dataType"] == DataTypeEnum.BUS_DATA.value[0]:await add_question_record(qa_context.token, conversation_id, message_id, task_id, qa_context.chat_id, qa_context.question, "", message, qa_type)async def send_message(self, response, answer, message):"""SSE 格式发送数据,每一行以 data: 开头"""if answer.lstrip().startswith("<think>"):# 处理deepseek模型思考过程样式think_content, remaining_content = await self.handle_think_tag(answer)# 发送<think>标签内的内容message = {"data": {"messageType": "continue", "content": "> " + think_content.replace("\n", "") + "\n\n" + remaining_content},"dataType": "t02",}await response.write("data:" + json.dumps(message, ensure_ascii=False) + "\n\n")else:await response.write("data:" + json.dumps(message, ensure_ascii=False) + "\n\n")@staticmethodasync def res_begin(res, chat_id):""":param res::param chat_id::return:"""await res.write("data:"+ json.dumps({"data": {"id": chat_id},"dataType": DataTypeEnum.TASK_ID.value[0],})+ "\n\n")@staticmethodasync def res_end(res):""":param res::return:"""await res.write("data:"+ json.dumps({"data": "DONE","dataType": DataTypeEnum.STREAM_END.value[0],})+ "\n\n")@staticmethoddef _build_request(chat_id, query, app_key, qa_type):"""构建请求参数:param chat_id: 对话id:param app_key: api key:param query: 用户问题:param qa_type: 问答类型:return:"""# 通用问答时,使用上次会话id 实现多轮对话效果conversation_id = ""if qa_type == DiFyAppEnum.COMMON_QA.value[0]:qa_record = query_user_qa_record(chat_id)if qa_record and len(qa_record) > 0:conversation_id = qa_record[0]["conversation_id"]body_params = {"query": query,"inputs": {"qa_type": qa_type},"response_mode": "streaming","conversation_id": conversation_id,"user": "abc-123",}headers = {"Content-Type": "application/json","Authorization": f"Bearer {app_key}",}dify_service_url = DiFyRestApi.build_url(DiFyRestApi.DIFY_REST_CHAT)return dify_service_url, body_params, headers@staticmethoddef _get_authorization_token(qa_type: str):"""根据请求类别获取api/token固定走一个dify流app-IzudxfuN8uO2bvuCpUHpWhvH master分支默认的数据问答key:param qa_type:return:"""# 遍历枚举成员并检查第一个元素是否与测试字符串匹配for member in DiFyAppEnum:if member.value[0] == qa_type:return os.getenv("DIFY_DATABASE_QA_API_KEY")else:raise ValueError(f"问答类型 '{qa_type}' 不支持")