当前位置: 首页 > backend >正文

DIFY教程第四弹:通过工作流来创建一个SQL语句的执行器

我们可以通过工作流来创建一个 SQL 语句的执行器,也就是我们可以输入相关的 SQL 语句然后通过工作流来连接
数据库执行对应的SQL代码,具体的设计如下:
这里的核心是代码执行模块。这块我们是调用了我们自己创建的接口来执行数据库的操作,所以我们需要先创建
这么一个接口,接口我们通过 Flask 这个轻量的 web 框架来实现。需要先安装 Flask 的依赖库
pip install flask
然后创建接口代码
from flask import Flask, request, jsonify
import pymysql
app = Flask(__name__)
def execute_sql(sql,connection_info):
"""
执行传入的 SQL 语句,并返回查询结果。
参数:
sql: 要执行的 SQL 语句(字符串)。
connection_info: 一个字典,包含数据库连接所需的信息:
- host: 数据库地址(如 "localhost")
- user: 数据库用户名
- password: 数据库密码
- database: 数据库名称
- port: 数据库端口(可选,默认为 3306)
- charset: 字符编码(可选,默认为 "utf8mb4")
返回:
如果执行的是 SELECT 查询,则返回查询结果的列表;
如果执行的是 INSERT/UPDATE/DELETE 等非查询语句,则提交事务并返回受影响的行数。
如果执行过程中出错,则返回 None。
"""
connection = None
try:
# 从 connection_info 中获取各项参数,设置默认值
host = connection_info.get("host", "localhost")
user = connection_info.get("user")
password = connection_info.get("password")
database = connection_info.get("database")
port = connection_info.get("port", 3306)
charset = connection_info.get("charset", "utf8mb4")
# 建立数据库连接
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
port=port,
charset=charset,
cursorclass=pymysql.cursors.Cursor # 可改为 DictCursor 返回字典格式结果
)
with connection.cursor() as cursor:
cursor.execute(sql)
# 判断是否为 SELECT 查询语句
if sql.strip().lower().startswith("select"):
result = cursor.fetchall()
else:
connection.commit() # 非查询语句需要提交事务
result = cursor.rowcount # 返回受影响的行数
return result
except Exception as e:
print("执行 SQL 语句时出错:", e)
return None
finally:
if connection:
connection.close()
@app.route('/execute_sql', methods=['POST'])
def execute_sql_api():
"""
接口示例:通过 POST 请求传入 SQL 语句和连接信息,返回执行结果。
请求示例 (JSON):
{
"sql": "SELECT * FROM your_table;",
"connection_info": {
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "your_database"
}
}"""
data = request.get_json()
if not data:
return jsonify({"error": "无效的请求数据"}), 400
sql = data.get("sql")
connection_info = data.get("connection_info")
if not sql or not connection_info:
return jsonify({"error": "缺少sql语句或数据库连接信息"}), 400
result = execute_sql(sql, connection_info)
return jsonify({"result": result})
if __name__ == '__main__':
# 开发环境下可以设置 debug=True,默认在本地5000端口启动服务
app.run(debug=True)
这个接口需要接收一个 sql 语句和一个包含数据库连接信息的 json 对象,我们可以编写对应的测试代码来看看
import json
import requests
def call_execute_sql_api(sql, connection_info):
"""
通过 requests 调用执行 SQL 的接口服务
参数:
sql: 要执行的 SQL 语句字符串
connection_info: 数据库连接信息字典,例如:
{
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "your_database",
"port": 3306 # 可选
}
返回:
接口返回的结果数据(字典格式),如果请求失败则返回 None
"""
url = "http://127.0.0.1:5000/execute_sql"
# 构造请求体
payload = {
"sql": sql,
"connection_info": connection_info
}
headers = {
"Content-Type": "application/json"
}try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
try:
return {"result":str(response.json()["result"])}
except Exception as e:
return {"result": f"解析响应 JSON 失败: {str(e)}"}
else:
return {"result": f"请求失败,状态码: {response.status_code}"}
except Exception as e:
return {"result": str(e)}
# 示例调用
if __name__ == "__main__":
sql_query = "select * from candidates where id = 1" # 替换为你的实际 SQL 语句
conn_info = {
"host": "localhost",
"user": "root",
"password": "123456",
"database": "ibms",
"port": 3306
}
result = call_execute_sql_api(sql_query, conn_info)
print("接口返回结果:", result)
执行后可以看到对应的结果 然后可以在工作流中来设置我们的代码 代码的内容
import json
import requests
def main(sql: str) -> dict:
url = "http://host.docker.internal:5000/execute_sql"
connection_info = {
"host": "localhost",
"user": "root",
"password": "123456",
"database": "ibms"
}
# 构造请求体
payload = {
"sql": sql,
"connection_info": connection_info
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
try:
return {"result":str(response.json()["result"])}
except Exception as e:
return {"result": f"解析响应 JSON 失败: {str(e)}"}
else:
return {"result": f"请求失败,状态码: {response.status_code}"}except Exception as e:
return {"result": str(e)}
注意上面的 url 中我们需要写
http://host.docker.internal:5000/execute_sql
不然执行的时候会出现 503 的错误。

如果调用接口的组件是 urllib3 的话有可能出现上面的问题,这个原因可能是版本兼容的问题,这里推进用的是
requests 组件

下一弹更新科研翻译应用
Dify教程目录
一、Dify的介绍

二、Dify的安装方式

1. 本地部署

2. Docker安装

3.Ollama

4.Dify关联Ollama

三、Dify应用讲解

1. 创建空白应用

2. 创建本地知识库

3.知识库应用

4. AI图片生成工具

5. 旅游助手

6. SQL执行器

7. 科研论文翻译

8. SEO翻译

9. 标题覚文案生成

10.知识库图像检索和展示

11.自然语言生成SQL

12. Echarts可视化助手

13.-如何用DeepSeek+Kimi快速生成PPT来提升你的工作效率

.........
————————————————

                            版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
                        
原文链接:https://blog.csdn.net/hxudhhgwhua/article/details/147592295

http://www.xdnf.cn/news/4447.html

相关文章:

  • 【计算机基础】任意进制转换方法详解
  • 资产管理系统对比评测:从传统模式到 AI 驱动的变革
  • 引用的使用
  • [Es_1] 介绍 | 特点 | 图算法 | Trie | FST
  • 【C/C++】errno/strerror 和 GetLastError()/FormatMessage 的区别
  • 模拟设计中如何减小失配
  • 4.系统定时器基本定时器
  • 操作系统——第四章(文件的物理结构以及与逻辑结构的对比)
  • Redis相关命令详解与原理
  • 【Agent】使用 Python 结合 OpenAI 的 API 实现一个支持 Function Call 的程序,修改本机的 txt 文件
  • 如何检查 Watchtower 是否正常工作及更新未生效的排查方法【日常排错】
  • 探寻程序开发的个人密码
  • excel 批量导出图片并指定命名
  • Excel点击单元格内容消失
  • 龙虎榜——20250507
  • LVGL -meter的应用
  • phpstudy升级新版apache
  • 如何在金仓数据库KingbaseES中新建一个数据库?新建一个表?给表添加一个字段?
  • 【PostgreSQL数据分析实战:从数据清洗到可视化全流程】8.1 基础图表绘制(折线图/柱状图/散点图)
  • 把本地的文件拷贝到wsl的文件夹下或者 wsl读取本地的文件
  • 使用node.js创建一个简单的服务器
  • WD5040L、 7V 至 37V 的宽输入电压,输出电压范围为 3.3V 至 36V,6A输出、保护功能强,电源管理利器!
  • Redis的缓存穿透、缓存击穿和缓存雪崩
  • ai说什么是注解,并以angular ts为例
  • Go——项目实战
  • 【强化学习】强化学习算法 - 多臂老虎机问题
  • 精益数据分析(47/126):深挖UGC商业模式的关键要点与指标
  • 多模态大语言模型arxiv论文略读(六十二)
  • uniapp自定义底部导航栏h5有效果小程序无效的解决方案
  • 鞅与停时 - 一种特别的概率论问题