当前位置: 首页 > news >正文

Go语言——goflow工作流使用

一、引入依赖

这个很坑,他不允许连接带密码的redis,只能使用不带密码的redis,要带密码的话得自己改一下源代码,无语

go get github.com/s8sg/goflow

二、画出我们的工作流程

在这里插入图片描述

三、编写代码

package mainimport ("encoding/json""fmt"flow "github.com/s8sg/goflow/flow/v1"goflow "github.com/s8sg/goflow/v1""log""math/rand""strconv"
)// Input 输入一个数字
func Input(data []byte, option map[string][]string) ([]byte, error) {var input map[string]int// 获取输入的数if err := json.Unmarshal(data, &input); err != nil {return nil, err}outputInt := input["input"]// 将数据交给工作流处理return []byte(strconv.Itoa(outputInt)), nil
}// AddOne 加上10以内的一个随机整数
func AddOne(data []byte, option map[string][]string) ([]byte, error) {// 获取上一个工作流的数据num, _ := strconv.Atoi(string(data))outputInt := num + rand.Intn(10) + 1fmt.Println("AddOne = ", outputInt)// 交给下一个工作流处理return []byte(strconv.Itoa(outputInt)), nil
}// AddTwo 加上10以内的一个随机整数
func AddTwo(data []byte, option map[string][]string) ([]byte, error) {num, _ := strconv.Atoi(string(data))outputInt := num + rand.Intn(10) + 1fmt.Println("AddTwo = ", outputInt)return []byte(strconv.Itoa(outputInt)), nil
}// Aggregator 聚合节点
func Aggregator(data []byte, option map[string][]string) ([]byte, error) {fmt.Println("Aggregator = ", string(data))return data, nil
}// Expand10 扩大10倍
func Expand10(data []byte, option map[string][]string) ([]byte, error) {num, _ := strconv.Atoi(string(data))outputInt := num * 10fmt.Println("Expand10 = ", outputInt)return []byte(strconv.Itoa(outputInt)), nil
}// Expand100 扩大100倍
func Expand100(data []byte, option map[string][]string) ([]byte, error) {num, _ := strconv.Atoi(string(data))outputInt := num * 100fmt.Println("Expand100 = ", outputInt)return []byte(strconv.Itoa(outputInt)), nil
}// Output 输出节点
func Output(data []byte, option map[string][]string) ([]byte, error) {fmt.Println("Output = ", string(data))return data, nil
}// 定义我们自己的一个流程
func MyFlow(workflow *flow.Workflow, context *flow.Context) error {// 创建DAGdag := workflow.Dag()// 创建节点dag.Node("input", Input)dag.Node("add-one", AddOne)dag.Node("add-two", AddTwo)// 这个聚合节点,就需要拿到add-one和add-two的结果dag.Node("aggregator", Aggregator, flow.Aggregator(func(m map[string][]byte) ([]byte, error) {addOneResult, _ := strconv.Atoi(string(m["add-one"]))addTwoResult, _ := strconv.Atoi(string(m["add-two"]))num := addOneResult + addTwoResultfmt.Println("aggregator = ", num)return []byte(strconv.Itoa(num)), nil}))// 这个方式是获取到节点的数据进行判断,然后返回一个字符串数组f1 := func(bytes []byte) []string {num, _ := strconv.Atoi(string(bytes))fmt.Println("ConditionalBranch = ", num)if num > 10 {return []string{"moreThan"}}return []string{"lessThan"}}// 这个方法就是将分支的数据返回给outputf2 := func(m map[string][]byte) ([]byte, error) {if v, ok := m["moreThan"]; ok {i, _ := strconv.Atoi(string(v))fmt.Println("f2 moreThan = ", i)return v, nil}if v, ok := m["lessThan"]; ok {i, _ := strconv.Atoi(string(v))fmt.Println("f2 lessThan = ", i)return v, nil}return nil, nil}// 创建一个条件分支节点branches := dag.ConditionalBranch("judge", []string{"moreThan", "lessThan"}, f1, flow.Aggregator(f2))branches["moreThan"].Node("expand-10", Expand10)branches["lessThan"].Node("expand-100", Expand100)dag.Node("output", Output)// 构建关系dag.Edge("input", "add-one")dag.Edge("input", "add-two")dag.Edge("add-one", "aggregator")dag.Edge("add-two", "aggregator")dag.Edge("aggregator", "judge")dag.Edge("judge", "output")return nil
}func main() {fs := goflow.FlowService{Port:              10001,RedisURL:          "127.0.0.1:6379",RedisPwd:          "p@ssw0rd",WorkerConcurrency: 5,RetryCount:        0,}if err := fs.Register("myFlow", MyFlow); err != nil {log.Printf("goflow register err: %v\n", err)return}if err := fs.Start(); err != nil {panic(err)}
}

四、Postman测试

在这里插入图片描述

在这里插入图片描述

http://www.xdnf.cn/news/372295.html

相关文章:

  • WPF之集合绑定深入
  • 计算机网络:什么是Mesh组网以及都有哪些设备支持Mesh组网?
  • drf 使用jwt
  • cv_connection (像halcon一样对区域进行打散)
  • .Net Mqtt协议-MQTTNet(一)简介
  • 养生:为健康生活筑牢根基
  • 路由重发布
  • 软件测试——用例篇(3)
  • 嵌入式与物联网:C 语言在边缘计算时代的破局之道
  • OSPF不规则区域划分
  • Win10无法上网:Windows 无法访问指定设备、路径或文件。你可能没有适当的权限访问该项目找不到域 TEST 的域控制器DNS 解析存在问题
  • 大节点是选择自建机房还是托管机房
  • 数据结构与算法分析实验12 实现二叉查找树
  • 深入理解 TCP:重传机制、滑动窗口、流量控制与拥塞控制
  • 考研408《计算机组成原理》复习笔记,第三章数值数据的表示和运算(定点数篇)
  • Ping 不通外网,Ping 得通主机问题解决小记
  • BUUCTF——Cookie is so stable
  • 《C++探幽:模板从初阶到进阶》
  • Docker Desktop安装在其他盘
  • [面试]SoC验证工程师面试常见问题(七)低速接口篇
  • rust-candle学习笔记13-实现多头注意力
  • Skyvern:用 AI+视觉驱动浏览器自动化
  • FreeTex v0.2.0:功能升级/支持Mac
  • Ubuntu 22.04(WSL2)使用 Docker 安装 Zipkin 和 Skywalking
  • 【含文档+PPT+源码】基于微信小程序的社区便民防诈宣传系统设计与实现
  • 基本句子结构
  • 前端取经路——现代API探索:沙僧的通灵法术
  • 每天五分钟机器学习:KTT条件
  • 在 Excel 中有效筛选重复元素
  • Stable Diffusion XL 文生图