DIFY教程第四弹:通过工作流来创建一个SQL语句的执行器
我们可以通过工作流来创建一个 SQL 语句的执行器,也就是我们可以输入相关的 SQL 语句然后通过工作流来连接
数据库执行对应的SQL代码,具体的设计如下:

这里的核心是代码执行模块。这块我们是调用了我们自己创建的接口来执行数据库的操作,所以我们需要先创建
这么一个接口,接口我们通过 Flask 这个轻量的 web 框架来实现。需要先安装 Flask 的依赖库
pip install flask
然后创建接口代码
from flask import Flask, request, jsonify
import pymysql
app = Flask(__name__)
def execute_sql(sql,connection_info):
"""
执行传入的 SQL 语句,并返回查询结果。
参数:
sql: 要执行的 SQL 语句(字符串)。
connection_info: 一个字典,包含数据库连接所需的信息:
- host: 数据库地址(如 "localhost")
- user: 数据库用户名
- password: 数据库密码
- database: 数据库名称
- port: 数据库端口(可选,默认为 3306)
- charset: 字符编码(可选,默认为 "utf8mb4")
返回:
如果执行的是 SELECT 查询,则返回查询结果的列表;
如果执行的是 INSERT/UPDATE/DELETE 等非查询语句,则提交事务并返回受影响的行数。
如果执行过程中出错,则返回 None。
"""
connection = None
try:
# 从 connection_info 中获取各项参数,设置默认值
host = connection_info.get("host", "localhost")
user = connection_info.get("user")
password = connection_info.get("password")
database = connection_info.get("database")
port = connection_info.get("port", 3306)
charset = connection_info.get("charset", "utf8mb4")
# 建立数据库连接
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
port=port,
charset=charset,
cursorclass=pymysql.cursors.Cursor # 可改为 DictCursor 返回字典格式结果
)
with connection.cursor() as cursor:
cursor.execute(sql)
# 判断是否为 SELECT 查询语句
if sql.strip().lower().startswith("select"):
result = cursor.fetchall()
else:
connection.commit() # 非查询语句需要提交事务
result = cursor.rowcount # 返回受影响的行数
return result
except Exception as e:
print("执行 SQL 语句时出错:", e)
return None
finally:
if connection:
connection.close()
@app.route('/execute_sql', methods=['POST'])
def execute_sql_api():
"""
接口示例:通过 POST 请求传入 SQL 语句和连接信息,返回执行结果。
请求示例 (JSON):
{
"sql": "SELECT * FROM your_table;",
"connection_info": {
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "your_database"
}
}"""
data = request.get_json()
if not data:
return jsonify({"error": "无效的请求数据"}), 400
sql = data.get("sql")
connection_info = data.get("connection_info")
if not sql or not connection_info:
return jsonify({"error": "缺少sql语句或数据库连接信息"}), 400
result = execute_sql(sql, connection_info)
return jsonify({"result": result})
if __name__ == '__main__':
# 开发环境下可以设置 debug=True,默认在本地5000端口启动服务
app.run(debug=True)
这个接口需要接收一个 sql 语句和一个包含数据库连接信息的 json 对象,我们可以编写对应的测试代码来看看
import json
import requests
def call_execute_sql_api(sql, connection_info):
"""
通过 requests 调用执行 SQL 的接口服务
参数:
sql: 要执行的 SQL 语句字符串
connection_info: 数据库连接信息字典,例如:
{
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "your_database",
"port": 3306 # 可选
}
返回:
接口返回的结果数据(字典格式),如果请求失败则返回 None
"""
url = "http://127.0.0.1:5000/execute_sql"
# 构造请求体
payload = {
"sql": sql,
"connection_info": connection_info
}
headers = {
"Content-Type": "application/json"
}try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
try:
return {"result":str(response.json()["result"])}
except Exception as e:
return {"result": f"解析响应 JSON 失败: {str(e)}"}
else:
return {"result": f"请求失败,状态码: {response.status_code}"}
except Exception as e:
return {"result": str(e)}
# 示例调用
if __name__ == "__main__":
sql_query = "select * from candidates where id = 1" # 替换为你的实际 SQL 语句
conn_info = {
"host": "localhost",
"user": "root",
"password": "123456",
"database": "ibms",
"port": 3306
}
result = call_execute_sql_api(sql_query, conn_info)
print("接口返回结果:", result)
执行后可以看到对应的结果
然后可以在工作流中来设置我们的代码
代码的内容


import json
import requests
def main(sql: str) -> dict:
url = "http://host.docker.internal:5000/execute_sql"
connection_info = {
"host": "localhost",
"user": "root",
"password": "123456",
"database": "ibms"
}
# 构造请求体
payload = {
"sql": sql,
"connection_info": connection_info
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
try:
return {"result":str(response.json()["result"])}
except Exception as e:
return {"result": f"解析响应 JSON 失败: {str(e)}"}
else:
return {"result": f"请求失败,状态码: {response.status_code}"}except Exception as e:
return {"result": str(e)}
注意上面的 url 中我们需要写 下一弹更新科研翻译应用
http://host.docker.internal:5000/execute_sql
不然执行的时候会出现 503 的错误。
如果调用接口的组件是 urllib3 的话有可能出现上面的问题,这个原因可能是版本兼容的问题,这里推进用的是
requests 组件
下一弹更新科研翻译应用
Dify教程目录
一、Dify的介绍
二、Dify的安装方式
1. 本地部署
2. Docker安装
3.Ollama
4.Dify关联Ollama
三、Dify应用讲解
1. 创建空白应用
2. 创建本地知识库
3.知识库应用
4. AI图片生成工具
5. 旅游助手
6. SQL执行器
7. 科研论文翻译
8. SEO翻译
9. 标题覚文案生成
10.知识库图像检索和展示
11.自然语言生成SQL
12. Echarts可视化助手
13.-如何用DeepSeek+Kimi快速生成PPT来提升你的工作效率
.........
————————————————
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/hxudhhgwhua/article/details/147592295