当前位置: 首页 > news >正文

workflow:高效的流式工作架构

引言

workflow是sougou的一款开源框架 主要是以请求回应的模式解决各自网络/IO任务而发明的

 一.workflow的任务流
1.workflow都封装了哪些任务流

以请求回应的模式来解释

① 网络层

服务端

在服务端的request 相当于发送了一个获取客户端请求的请求,response相当于接收客户端的请求从而处理请求

客户端 

在客户端的request相当于向服务端发送了一个请求,response相当于接收服务端回复的消息

 ② 定时器

发送延时请求---->处理延时任务

③  IO层

发起处理文件IO的请求---->处理文件

④  cpu

发起耗时计算的请求---->处理耗时任务

总结:

耗时等待 :负载均衡 类似于fd通过%4取模的方式负载均衡给四个网络线程 ,当条件满足的时候把任务抛出给工作线程处理

 

耗时计算:当遇到计算任务的时候工作线程会把计算任务抛出给到go线程,go线程池通过任务调度处理计算任务

2.任务是如何来进行组织的呢

在workflow中通常以串联,并联,DAG 三种方式对任务进行组织

①串联

想象成你中学学习物理的时候的串联电路

②并联

想象成你中学学习物理的时候的并联电路

③DAG

在工作流(Workflow)系统中,DAG(有向无环图,Directed Acyclic Graph) 是一个非常核心的概念,它用于表示任务之间的执行依赖关系,画一个简单的图演示一下。

 

任务A 最先执行

任务B 和 任务C 并行执行,但都依赖于 任务A

任务D 要等 任务B 和 任务C 都完成后才执行

学过408中的操作系统信号量的同学应该可以很轻松理解这个模型

 

二.线程模型

main函数入口主线程

4个网络线程

20个工作线程

8个计算线程

 

 三.workflow的三板斧
1.三板斧是哪三板斧

first-->抽象粒度合适的异步任务

second--->通过任务流组织任务:串联,并联,以及DAG

third--->协调任务:counter,conditional,resource pool,message queue

2.message queue 

我们简单介绍一下message queue 消息队列的部分

队列的接口层

 在workflow中消息队列提供了上述这些接口 分别是 队列的创建 ,获取队列 ,往队列里面put 消息  ,设置队列是阻塞还是非阻塞 ,队列的销毁 

队列都有哪些成员

 此队列中有两个队列 分别是put队列 和 get队列 我们了解这两个队列的同时 我们要先理解多消费者和多生成者模型 , 在生产者往队列里面put消息的时候 是通过在put队列里面put操作 ,当消费者消费队列中的消息的时候 通过在get队列里面使用get操作,当get队列里面没有数据的时候,put队列里面的数据会转移到get队列里面供消费者消费,我们这样设计的目的是为了减少生产者和消费者之间的碰撞从而提供效率

 我们从中拿一个消息队列的头插法举个例子

 我们用void** 接收一个偏移后的message 用C语音的void**的好处是可以接收任何类型的数据,更加自由的去操作数据

我们对消息队列进行put操作前 我们要保证原子性 所以我们要通过pthread_mutex_lock 上锁 和pthread_mutex_unloc解锁

 后面我们通过条件变量保证数据的同步性pthread_cond_wait

 3.我们举一个简单的用workflow写的一个http请求回应代码
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>static WFFacilities::WaitGroup waitGroup(1);  // 用于主线程等待任务完成void http_callback(WFHttpTask *task)
{int state = task->get_state();int error = task->get_error();if (state != WFT_STATE_SUCCESS){printf("HTTP request failed. State = %d, Error = %d\n", state, error);waitGroup.done();  // 通知主线程任务已完成return;}const void *body;size_t body_len;task->get_resp()->get_parsed_body(&body, &body_len);printf("HTTP Response:\n%.*s\n", (int)body_len, (const char *)body);waitGroup.done();  // 通知主线程任务已完成
}int main()
{signal(SIGPIPE, SIG_IGN);  // 忽略 SIGPIPE 信号const char *url = "http://www.github.com/";WFHttpTask *task = WFTaskFactory::create_http_task(url, 4, 2, http_callback);task->start();  // 异步启动任务waitGroup.wait();  // 主线程等待任务完成return 0;
}

我们把一个url 放入到一个任务流中 并设置回调函数 

任务流对github发送request github收到请求 并 回复response给我们 触发回调

打印http://www.example.com 的 HTTP 响应内容。

http://www.xdnf.cn/news/548605.html

相关文章:

  • BPMN.js编辑器设计器与属性面板数据交互
  • 【动手学深度学习】系列
  • 【AI News | 20250520】每日AI进展
  • 5.20 note
  • 什么是 AI 人工智能?什么是机器学习?什么是深度学习?三者啥关系
  • 基于AutoDL市场下的Pycharm远程控制
  • Redis从入门到实战 - 高级篇(中)
  • Jedis快速入门【springboot】
  • NMOS和PMOS的区别
  • 大语言模型 14 - Manus 超强智能体 开源版本 OpenManus 上手指南
  • 从混乱到高效:我们是如何重构 iOS 上架流程的(含 Appuploader实践)
  • 南柯电子|储能EMC整改:从单点整改到智能预测的进化路径
  • 瑞萨单片机笔记
  • #渗透测试#批量漏洞挖掘#LiveBos UploadFile(CVE-2021-77663-2336) 任意文件上传漏洞
  • Translational Psychiatry | 注意缺陷多动障碍儿童延迟厌恶的行为与神经功能特征茗创科技茗创科技
  • MySQL与Redis一致性问题分析
  • 数据库与存储安全
  • DeepSeek在政务与公共服务中的智能化实践
  • 中国国际软件发展大会荣誉揭晓,远光九天 AI 应用开发平台获评“软件行业突破性技术成果”
  • 多模态实时交互边界的高效语音语言模型 VITA-Audio 介绍
  • 全球氰化物测定仪市场:现状、趋势与展望
  • PLC系统中开关量与模拟量信号解析
  • 跳空高低开策略思路
  • 优化Hadoop性能:如何修改Block块大小
  • SpringBoot与GeoHash整合,实现骑手就近派单功能
  • Go语言实战:使用 excelize 实现多层复杂Excel表头导出教程
  • Github 2025-05-20Python开源项目日报 Top9
  • 重要通知!!2025年上半年软考考试准考证打印通知(附各地区打印时间表)
  • 【Java】继承和多态在 Java 中是怎样实现的?
  • Token的组成详解:解密数字身份凭证的构造艺术