NodeRED模拟复杂流程处理
文章目录
- Node-RED介绍
- 特点和设计理念
- Node-RED安装
- 以下实现介绍
- 引入库安装使用
- http in 接口函数异步响应
- 流程图
- http in节点配置
- 流程JSON代码
- 接口验证
- 日志输出
- http in 接口全局函数调用处理
- 流程图
- 流程JSON代码
- 接口验证
- 日志输出
Node-RED介绍
Node-RED是一个基于Node.js的开源流程控制和网络构建工具,它提供了一个基于浏览器的流编辑器,使你可以将各种设备、API和在线服务以节点的形式拖放来构建工作流。Node-RED拥有丰富的节点库,支持多种协议转换,包括HTTP、MQTT等,非常适合用于物联网(IoT)应用的开发。
- Node-Red 是一个开源的可视化编程工具,有丰富的扩展模块可使用
- Node-Red 由IBM开发,主要用于连接连接计算机、传感器和在线服务等协议或组件,以简化它们之间的布线工作
- Node-Red 允许通过组合各部件来编写应用程序,这些部件也可以是硬件设备、Web API 、在线服务
特点和设计理念
特点和设计理念:主要特点是易于使用的可视化界面和低代码编写要求。 适合于快速原型开发和简单的数据流处理。社区活跃,提供了大量的节点(node)和功能。
Node-RED安装
参考文档:点击查看安装文档教程
以下实现介绍
这里主要是实现对于业务流程比较复杂的场景处理,两种方式传递参数:第一种采用全局函数来异步调用的方式,第二种采用函数直接传递值到下一个节点来处理数据。
引入库安装使用
1、npm安装
npm install axios
npm install form-data
或者
npm install axios form-data
2、setting.js配置
http in 接口函数异步响应
流程图
http in节点配置
流程JSON代码
[{"id": "692deca3b7c86bbd","type": "tab","label": "node.send() 使用","disabled": false,"info": "","env": []},{"id": "b5a41e4e1279b0e1","type": "function","z": "692deca3b7c86bbd","name": "function 1","func": "// 处理 HTTP GET 请求\nnode.log(\"function 1 先到这里\")\nif (msg.req.method === \"GET\") {\n try {\n // 构建响应\n const response = {\n status: \"success\",\n message: \"function 1 处理 GET 请求成功\",\n data: {\n page: 1,\n pageSize: 20\n }\n };\n // 设置响应头\n //msg.res.setHeader(\"Content-Type\", \"application/json\");\n\n // 发送响应\n msg.payload = JSON.stringify(response);\n \n node.send(msg);\n } catch (error) {\n node.error(\"function 1 处理 GET 请求失败: \" + error.message, msg);\n }\n}\n\n// 处理 HTTP POST 请求\nif (msg.req.method === \"POST\") {\n try {\n // 构建响应\n const response = {\n status: \"success\",\n message: \"异步返回成功\",\n data: msg.payload\n };\n\n // 设置响应头\n //msg.res.setHeader(\"Content-Type\", \"application/json\");\n\n // 发送响应\n msg.payload = JSON.stringify(response);\n node.send(msg);\n } catch (error) {\n node.error(\"异步返回成功失败: \" + error.message, msg);\n }\n}","outputs": 1,"timeout": 0,"noerr": 0,"initialize": "","finalize": "","libs": [],"x": 600,"y": 300,"wires": [["536ea820e73ec1e3","e508082235f97408"]]},{"id": "536ea820e73ec1e3","type": "debug","z": "692deca3b7c86bbd","name": "function 1","active": true,"tosidebar": true,"console": false,"tostatus": false,"complete": "payload","targetType": "msg","statusVal": "","statusType": "auto","x": 1040,"y": 300,"wires": []},{"id": "4aae1863f6fac686","type": "debug","z": "692deca3b7c86bbd","name": "0.主业务处理","active": true,"tosidebar": true,"console": false,"tostatus": false,"complete": "payload","targetType": "msg","statusVal": "","statusType": "auto","x": 1670,"y": 300,"wires": []},{"id": "f1565e6feb0e4ad4","type": "http response","z": "692deca3b7c86bbd","name": "接口响应","statusCode": "200","headers": {},"x": 1660,"y": 380,"wires": []},{"id": "e508082235f97408","type": "function","z": "692deca3b7c86bbd","name": "function 2","func": "try {\n node.log(\"function 2 \" + JSON.stringify(msg.payload));\n\n // 模拟错误\n if (!msg.payload) {\n throw new Error(\"消息为空\");\n }\n\n // 构建响应\n const response = {\n status: \"success\",\n message: \"function 2 处理请求成功\",\n data: msg.payload\n };\n\n // 设置响应头\n //msg.res.setHeader(\"Content-Type\", \"application/json\");\n\n // 发送响应\n msg.payload = JSON.stringify(response);\n node.send(msg);\n} catch (error) {\n node.error(error.message, msg);\n}","outputs": 1,"timeout": 0,"noerr": 0,"initialize": "","finalize": "","libs": [],"x": 600,"y": 480,"wires": [["30e309f80ceefbca"]]},{"id": "30e309f80ceefbca","type": "debug","z": "692deca3b7c86bbd","name": "function 2","active": true,"tosidebar": true,"console": false,"tostatus": false,"complete": "payload","targetType": "msg","statusVal": "","statusType": "auto","x": 1060,"y": 480,"wires": []},{"id": "2dd22189dc6f48af","type": "http in","z": "692deca3b7c86bbd","name": "Node-RED框架之nodeSendAPI测试","url": "/node-red/customized/nodeSend","method": "get","upload": true,"swaggerDoc": "","x": 260,"y": 220,"wires": [["47ff3a5416742a3f","b5a41e4e1279b0e1"]]},{"id": "557f3ac3154cc1b0","type": "http response","z": "692deca3b7c86bbd","name": "分析结果响应","statusCode": "200","headers": {"content-type": "application/json"},"x": 1060,"y": 120,"wires": []},{"id": "47ff3a5416742a3f","type": "function","z": "692deca3b7c86bbd","name": "异步响应","func": "node.log(\"处理请求并发送异步响应,参数: \" + JSON.stringify(msg.payload));\nconst deviceId = msg.payload.deviceId;\nconst responseMsg = {\n status: \"success\",\n message: \"异步返回成功\",\n data: null\n};\nmsg.payload = JSON.stringify(responseMsg);\nnode.send(msg);","outputs": 1,"timeout": 0,"noerr": 0,"initialize": "","finalize": "","libs": [],"x": 600,"y": 120,"wires": [["557f3ac3154cc1b0"]]}
]
接口验证
curl --location --request GET 'http://127.0.0.1:1880/node-red/customized/nodeSend?deviceId=1234567980' \
--header 'Accept-Language: zh-cn' \
--header 'User-Agent: Apifox/1.0.0 (https://apifox.com)' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer 2d852a07-7f6c-46b7-ae52-2284cace2ebb' \
--header 'Accept: */*' \
--header 'Host: 127.0.0.1:1880' \
--header 'Connection: keep-alive'
日志输出
node-red调试窗口
node-red控制台
http in 接口全局函数调用处理
如下图所示
functionExternalModules: true,
functionGlobalContext: {os:require('os'),axios: require('axios'),crypto: require('crypto'),formData: require('form-data')
},
流程图
流程JSON代码
[{"id": "da96e160ae8b4fec","type": "tab","label": "V1上传文件至腾讯8520智能分析-副本","disabled": false,"info": "","env": []},{"id": "0c6e571e50ceb236","type": "http in","z": "da96e160ae8b4fec","name": "设备告警录像文件上传","url": "/node-red/customized/uploadFile1","method": "post","upload": true,"swaggerDoc": "","x": 180,"y": 180,"wires": [["0d5d4331218399fc"]]},{"id": "0d5d4331218399fc","type": "function","z": "da96e160ae8b4fec","name": "1.主业务处理","func": "// 引入库文件\nconst axios = global.get('axios');\n\n// 1. 立即构造响应消息并发送给 HTTP Response 节点\nconst responseMsg = {\n payload: {\n \"code\": 0,\n \"msg\": \"请求已接收,正在处理...\",\n \"data\": null\n }, // 响应内容\n res: msg.res, // 必须保留 HTTP 响应对象\n _async: true // 标记此消息为异步响应\n};\nnode.send([responseMsg, null]); // 第一个端口发给 HTTP Response,第二个端口留空\n\n\n// 第三方库校验是否正确引入并生效\n// if (global.get('axios')) {\n// node.send({ payload: \"axios 模块已正确引入并生效\" });\n// } else {\n// node.error(\"axios 模块未正确引入\");\n// }\n// if (global.get('crypto')) {\n// node.send({ payload: \"crypto 模块已正确引入并生效\" });\n// } else {\n// node.error(\"crypto 模块未正确引入\");\n// }\n// if (global.get('formData')) {\n// node.send({ payload: \"FormData 模块已正确引入并生效\" });\n// } else {\n// node.error(\"FormData 模块未正确引入\");\n// }\n\n/**================接口参数输出=====================**/\nnode.log(\"接口请求参数:\" + JSON.stringify(msg.payload));\n\n// 处理业务逻辑 multipart/form-data 上传\nif (msg.req.files && msg.req.files.length > 0) {\n\n var deviceId = msg.payload.deviceId;\n \n // 获取第一个文件\n const file = msg.req.files[0];\n const fileInfo = {\n filename: file.originalname,\n //buffer: file.buffer,\n size: file.size,\n mimetype: file.mimetype\n };\n node.log(\"上传文件的信息为:\" + JSON.stringify(fileInfo));\n\n /**================运营云平台token校验=====================**/\n var authorizationHeader = \"\"\n if (msg.req.headers) {\n authorizationHeader = msg.req.headers['authorization']; \n } else {\n node.error('Step1、No headers found in the message'); \n msg.payload = {\n \"code\": 1,\n \"msg\": \"No headers found in the message\",\n \"data\": null\n }\n return msg;\n }\n // 获取全局函数\n //const checkAioCloudToken = global.get(\"checkAioCloudToken\");\n // 调用全局函数并等待结果\n const isAvailable = await checkAioCloudToken(authorizationHeader); \n //const isAvailable = true; \n node.log(\"Step1.运营云平台token校验结果:\" + isAvailable);\n if (!isAvailable){\n msg.payload = {\n \"code\": 1,\n \"msg\": \"Token校验失败\",\n \"data\": null\n }\n return msg;\n }\n \n /**================腾讯8520存储存储服务=====================**/\n const fileStorageFun = global.get(\"fileStorageFun\");\n const fileUrl = await fileStorageFun(file); \n node.log(\"Step2.腾讯8520存储服务文件上传结果:\" + fileUrl);\n\n\n\n /**================腾讯8520智能结构化任务=====================**/\n const structAnalysisFun = global.get(\"structAnalysisFun\");\n const fileSize = file.size;\n const analysisResult = await structAnalysisFun(deviceId, fileUrl, fileSize); \n node.log(\"Step3.腾讯8520智能结构化任务分析结果:\" + JSON.stringify(analysisResult));\n\n \n /**================响应数据=====================**/\n msg.payload = {\n \"code\": 0,\n \"msg\": \"Business processing successful\",\n \"data\": analysisResult\n }\n node.log(\"processing results:\" + JSON.stringify(msg.payload));\n //return msg;\n} else {\n msg.payload = {\n \"code\": 400,\n \"msg\": \"No files uploaded\",\n \"data\": null\n }\n node.error(\"processing results:\" + JSON.stringify(msg.payload));\n //return [null, msg];\n}\n\n\n\n// 运营云平台鉴权校验Token值\nasync function checkAioCloudToken(token) {\n //node.log(\"运营云平台鉴权校验Token值为:\" + token)\n const newToken = token.replace(/Bearer /, \"\");\n // token校验地址\n const CHECK_TOKEN_URL = \"http://10.81.116.13:9999/auth/token/check_token\";\n const url = CHECK_TOKEN_URL + \"?token=\" + newToken\n //node.log(\"运营云平台鉴权校验Token请求地址:\" + url)\n return new Promise((resolve, reject) => {\n try {\n axios.get(url, {\n headers: {\n 'Content-Type': 'application/json'\n //'Authorization': token\n }\n }).then(response => {\n //node.send({ payload: response.data });\n node.log(\"Step1.1 运营云平台鉴权校验Token响应结果:\" + JSON.stringify(response.data));\n const result = response.data;\n if (!result.code || result.code == null) {\n //node.log(\"运营云平台鉴权校验Token校验成功\");\n return resolve(true);\n }\n //node.error(\"运营云平台鉴权校验Token校验失败\");\n return resolve(false);\n })\n .catch(error => {\n if (error.response && error.response.status === 401) {\n node.error('运营云平台身份验证失败,请检查令牌');\n } else {\n node.error(error);\n }\n return resolve(false);\n });\n } catch (error) {\n reject(error);\n }\n });\n}","outputs": 1,"timeout": 0,"noerr": 0,"initialize": "","finalize": "","libs": [],"x": 610,"y": 180,"wires": [["1a841b879c47ee44","e3c4458354d6ca01","d6f0f1b2e6562a68"]]},{"id": "1a841b879c47ee44","type": "debug","z": "da96e160ae8b4fec","name": "1.主业务处理","active": true,"tosidebar": true,"console": false,"tostatus": false,"complete": "payload","targetType": "msg","statusVal": "","statusType": "auto","x": 1050,"y": 100,"wires": []},{"id": "e3c4458354d6ca01","type": "http response","z": "da96e160ae8b4fec","name": "接口响应","statusCode": "200","headers": {},"x": 1040,"y": 180,"wires": []},{"id": "d6f0f1b2e6562a68","type": "function","z": "da96e160ae8b4fec","name": "2.全局函数DEMO服务","func": "/**================全局函数DEMO服务=====================**/\n// 引入库文件\n//const crypto = require('crypto');\n//const fetch = require('node-fetch');\n//const FormData = require('form-data');\nconst axios = global.get('axios');\nconst crypto = global.get('crypto');\nconst FormData = global.get('formData');\n\n// 定义全局变量\nconst AK = \"d19b4d58d9133751234567890\";\nconst SK = \"dd5d0ae84a9a1e14e0eff23a\";\nconst FILE_BASE_URL = \"http://192.168.2.222:8804\";\n\n\n// 生成带签名的请求头\nfunction buildHeader(ak, sk, HTTP_VERB, canonicalizedResource) {\n const date = formatBeijingToUTC();\n const header = new Map();\n header.set('Content-Type', 'application/json');\n header.set('X-LC-Date', date);\n header.set('X-LC-Version', '1.0');\n\n const stringToSign = `${HTTP_VERB}\\n\\napplication/json\\n${date}\\nx-lc-date:${date}\\nx-lc-version:1.0\\n${canonicalizedResource}`;\n const signature = sign(stringToSign, sk);\n const authorization = `LC-HMAC-SHA256 ${ak}:${signature}`;\n header.set('Authorization', authorization);\n\n return Object.fromEntries(header);\n}\n\n// 签名生成方法\nfunction sign(stringToSign, key) {\n if (!stringToSign || !key) {\n return null;\n }\n\n const signingKey = Buffer.from(key, 'utf8');\n const hmac = crypto.createHmac('sha256', signingKey);\n hmac.update(stringToSign, 'utf8');\n const base64Result = hmac.digest('base64');\n\n return base64Result;\n}\n\n// 北京时间转 UTC\nfunction formatBeijingToUTC() {\n const beijingTime = new Date();\n const utcTime = new Date(beijingTime.getTime() - beijingTime.getTimezoneOffset() * 60000);\n\n const year = utcTime.getUTCFullYear();\n const month = String(utcTime.getUTCMonth() + 1).padStart(2, '0');\n const day = String(utcTime.getUTCDate()).padStart(2, '0');\n const hours = String(utcTime.getUTCHours()).padStart(2, '0');\n const minutes = String(utcTime.getUTCMinutes()).padStart(2, '0');\n const seconds = String(utcTime.getUTCSeconds()).padStart(2, '0');\n\n return `${year}${month}${day}T${hours}${minutes}${seconds}Z`;\n}\n\n// 计算 HTTP 请求头长度\nfunction calculateContentLength(headers) {\n let headerBuilder = '';\n for (const [key, values] of Object.entries(headers)) {\n for (const value of values) {\n headerBuilder += `${key}: ${value}\\r\\n`;\n }\n }\n return headerBuilder.length;\n}\n\n// 获取文件扩展名\nfunction getFileExtension(filename) {\n if (!filename || filename === '') {\n return '';\n }\n\n const lastDotIndex = filename.lastIndexOf('.');\n if (lastDotIndex === -1 || lastDotIndex === filename.length - 1) {\n return '';\n }\n\n return filename.substring(lastDotIndex + 1);\n}\n\n// 创建小文件资源池\nasync function createSmallFileEfsResourcePool(lifeCycle) {\n node.log(\"Step2.1 创建小文件资源池...参数lifeCycle:\" + lifeCycle)\n // 获取全局函数\n const resUrl = \"/fileStorage/smallFile/efsResourcePool\";\n try {\n // 获取全局函数\n const fetchData = global.get(\"fetchData\");\n const headers = buildHeader(AK, SK, \"POST\", resUrl);\n const params = { lifeCycle: lifeCycle };\n\n // 调用全局函数并等待结果\n const response = await fetchData(\n FILE_BASE_URL + resUrl,\n 'POST',\n JSON.stringify(params),\n headers\n );\n node.log(\"全局函数DEMO服务,创建小文件资源池响应response:\" + JSON.stringify(response));\n if (response.resourceId != \"\") {\n return response.resourceId;\n }\n return \"\";\n } catch (e) {\n node.error(\"腾讯8520创建小文件资源池错误:\" + e.message);\n return \"\";\n }\n}\n\n// 请求小文件数据节点地址\nasync function querySmallFileAddress(resourceId) {\n node.log(\"Step2.2 请求小文件数据节点地址...resourceId:\" + resourceId)\n const resUrl = `/fileStorage/smallFile/address?resourceId=${resourceId}`;\n try {\n // 获取全局函数\n const fetchData = global.get(\"fetchData\");\n const headers = buildHeader(AK, SK, \"GET\", resUrl);\n // 调用全局函数并等待结果\n const response = await fetchData(\n FILE_BASE_URL + resUrl,\n 'GET',\n null,\n headers\n );\n node.log(\"全局函数DEMO服务,请求小文件数据节点地址响应response:\" + JSON.stringify(response));\n const data = response;\n return data;\n } catch (e) {\n node.error(\"腾讯8520请求小文件数据节点地址错误:\" + e.message);\n return null;\n }\n}\n\n// 小文件上传文件\nasync function uploadSmallFileWrite(file, uploadSmallFileRequest) {\n node.log(\"Step2.3 小文件上传文件参数...uploadSmallFileRequest:\" + JSON.stringify(uploadSmallFileRequest));\n const resUrl = \"/fileStorage/smallFile/write\";\n try {\n const headers = buildHeader(AK, SK, \"POST\", resUrl);\n const formData = new FormData();\n formData.append(\"token\", uploadSmallFileRequest.token);\n formData.append(\"resourceId\", uploadSmallFileRequest.resourceId);\n formData.append(\"dataNum\", 3);\n formData.append(\"parityNum\", 1);\n formData.append(\"suffix\", getFileExtension(file.originalFilename));\n formData.append(\"dataLength\", file.size);\n formData.append(\"data\", file.buffer, file.originalFilename);\n\n const fetchData = global.get(\"fetchData\");\n // 调用全局函数并等待结果\n const response = await fetchData(\n FILE_BASE_URL + resUrl,\n 'POST',\n formData,\n headers\n );\n\n node.log(\"全局函数DEMO服务,小文件上传文件接口响应:\" + JSON.stringify(response));\n if (response.fileUrl != \"\") {\n return response.fileUrl;\n }\n return \"\";\n } catch (e) {\n node.error(\"腾讯8520小文件上传文件错误:\" + e.message);\n return { status: false, message: e.message };\n }\n}\n\n// 主调用函数:获取文件后缀名\nasync function fileStorageFun(file) {\n try {\n const fileInfo = {\n filename: file.originalname,\n //buffer: file.buffer,\n size: file.size,\n mimetype: file.mimetype\n };\n //node.log(\"Step2 fileStorageFun 上传文件的信息为:\" + JSON.stringify(fileInfo));\n \n //node.log(\"Step2.1、全局函数DEMO服务,创建小文件资源池开始\");\n //const resourceId = await createSmallFileEfsResourcePool(24);\n const resourceId = 38;\n if (!resourceId || resourceId == null) {\n node.error(\"Step2.1 全局函数DEMO服务,创建小文件资源池错误\");\n return \"\";\n }\n node.log(\"Step2.1 全局函数DEMO服务,创建小文件资源池结果:\" + resourceId);\n\n //node.log(\"Step2.2、全局函数DEMO服务,请求小文件数据节点地址开始\");\n //const addressInfo = await querySmallFileAddress(resourceId);\n const addressInfo = { \"ip\": \"10.192.85.101\", \"port\": 9011, \"httpsPort\": 9012, \"token\": \"1461835071528190010\" }; \n if (!addressInfo || addressInfo == null) {\n node.error(\"Step2.2 全局函数DEMO服务,请求小文件数据节点地址错误\");\n return \"\";\n }\n node.log(\"Step2.2 全局函数DEMO服务,请求小文件数据节点地址结果:\" + JSON.stringify(addressInfo));\n\n //node.log(\"Step2.3、全局函数DEMO服务,小文件上传文件开始\");\n const uploadRequest = {\n token: addressInfo.token,\n resourceId: resourceId\n };\n\n //const fileUrl = await uploadSmallFileWrite(file, uploadRequest);\n const fileUrl = \"/data/VSL/DSSEnterpriseClient/DSS_Enterprise_Client32.dav\";\n if (!fileUrl || fileUrl == null) {\n node.error(\"Step2.3 全局函数DEMO服务,小文件上传文件错误\");\n return \"\";\n }\n node.log(\"Step2.3 全局函数DEMO服务,小文件上传文件结果:\" + fileUrl);\n return fileUrl;\n } catch (error) {\n node.error(\"Step2 全局函数DEMO服务处理文件上传失败: \" + error.message);\n return \"\";\n }\n}\n\n\nglobal.set(\"fileStorageFun\", fileStorageFun);\n\n//return msg;","outputs": 1,"timeout": 0,"noerr": 0,"initialize": "// 部署节点后,此处添加的代码将运行一次。 \n// 在函数节点中定义全局函数\nglobal.set(\"fetchData\", function (url, method, data, headers) {\n return new Promise((resolve, reject) => {\n const axios = global.get('axios');\n\n const config = {\n method: method,\n url: url,\n headers: headers || {},\n data: data\n };\n\n axios(config)\n .then(response => {\n node.log(\"fetchData response:\" + JSON.stringify(response));\n resolve(response.data);\n })\n .catch(error => {\n node.error(\"fetchData response:\" + error);\n reject(error);\n });\n });\n});","finalize": "","libs": [],"x": 620,"y": 400,"wires": [["de0ee00f4f44fd64"]]},{"id": "de0ee00f4f44fd64","type": "debug","z": "da96e160ae8b4fec","name": "2.全局函数DEMO服务","active": true,"tosidebar": true,"console": false,"tostatus": false,"complete": "payload","targetType": "msg","statusVal": "","statusType": "auto","x": 1060,"y": 400,"wires": []},{"id": "8fc47e9434b6b142","type": "inject","z": "da96e160ae8b4fec","name": "腾讯8520 Trigger Initialization","props": [{"p": "payload"},{"p": "topic","vt": "str"}],"repeat": "","crontab": "","once": false,"onceDelay": 0.1,"topic": "","payload": "","payloadType": "date","x": 190,"y": 400,"wires": [["d6f0f1b2e6562a68"]]}
]
接口验证
curl --location --request POST 'http://127.0.0.1:1880/node-red/customized/uploadFile' \
--header 'Accept-Language: zh-cn' \
--header 'User-Agent: Apifox/1.0.0 (https://apifox.com)' \
--header 'Authorization: Bearer 35678c99-9f2b-4c92-8b80-eda33a66c7ee' \
--header 'Accept: */*' \
--header 'Host: 127.0.0.1:1880' \
--header 'Connection: keep-alive' \
--header 'Content-Type: multipart/form-data; boundary=--------------------------299934904553613462394355' \
--form 'deviceId="132456798"' \
--form 'file=@"D:\\upload\\2025-04-11_10-33-08.dav"'
日志输出
node-red调试窗口
略…
node-red控制台
略…