当前位置: 首页 > ds >正文

AutoCompose - 携程自动编排框架的简单介绍

AutoCompose - 携程自动编排框架的简单介绍

  • 前言
  • ✅ 一、这个框架是做什么的?
  • ⚙️ 二、Demo 示例
    • 2.1 制造车架、采购电池、制造引擎
    • 2.2 安装电池、电子设备、引擎
    • 2.3 汽车生产结束
  • ⚙️ 三、运行机制
    • 1. 构建阶段:
    • 2. 执行阶段:
  • 📐 四、框架原理图和优势介绍
  • 五、和开源框架LiteFlow框架有什么区别?

前言

一句话介绍框架:

AutoCompose 是一个基于 DAG 的轻量级异步自动编排框架,支持组件依赖关系自动识别、异步执行、上下文隔离和可视化展示。

✅ 一、这个框架是做什么的?

这是一个 自动编排执行框架(AutoCompose Framework),它的核心目标是:

在Spring / SpringBoot中 ,根据组件的注入依赖关系,对多个异步任务之间的依赖关系进行建模,并通过有向无环图(DAG)来控制它们的执行顺序。

它适用于以下场景:

  • 多个异步任务之间存在依赖关系
  • 希望以“声明式”方式定义依赖链,而不是硬编码调用顺序
  • 想要支持上下文隔离(如每个用户/订单/车型拥有独立上下文)
  • 想要可视化地查看任务链结构

框架的核心能力:

核心能力描述
✅ 自动识别组件依赖利用 Spring 容器 + 注解扫描,自动生成组件间依赖关系
✅ DAG 编排调度所有组件为节点,依赖关系为边,构建有向无环图(DAG),后序遍历保证正确执行顺序
✅ 异步任务支持支持 Reactor、CompletableFuture、ListenableFuture 等多种异步模型
✅ 上下文传递与隔离使用 TransmittableThreadLocal 实现线程级上下文安全,防止交叉污染
✅ 结果缓存组件执行结果统一缓存,避免重复执行,提高性能
✅ 循环依赖检测在运行前自动检测是否存在循环依赖并抛异常
✅ 缓存一致性验证防止跨组件数据传递时缓存丢失

该框架基于以下开源库设计:

技术用途
Spring Boot / Spring提供容器管理、自动装配
TransmittableThreadLocal实现跨线程上下文传递与隔离
JGraphT / mxGraph构建 DAG 图,提供依赖可视化输出
Reactor Core支持响应式异步编程
Guava工具类封装(如 ListenableFuture、SettableFuture)
TTL Wrappers Ext对异步任务进行 ThreadLocal 封装装饰,保证上下文一致

⚙️ 二、Demo 示例

我们来模拟一下汽车的生产过程,列出第一步骤:

  • 制造车架
  • 采购电池

第二个大步骤:

  • 车架和电池完成后 --> 安装电池。
  • 车架和电池完成后 --> 安装电子设备。
  • 电池完成后 --> 制造引擎。

第三个大步骤:

  • 车架和引擎完成后 --> 安装引擎

第四个大步骤:

  • 安装引擎、安装电池、安装电子设备 完成后 --> 车辆生产结束

用图表示就是(此图可以自动生成):
在这里插入图片描述

那么在进入代码案例之前,我们来看下框架使用的几个基本点:

  1. 需要参与编排的组件,需要实现AutoComposable接口。重写execute函数,将该组件需要输出的内容返回出去。
  2. 自动编排之前,需要往ContextHolder中塞入对应的参数,这样整个编排周期中就可以拿到这个上下文。

核心抽象接口说明

接口描述
AutoComposable<T, R>通用自动编排接口,所有可编排组件必须实现它
T: 执行返回类型(如 Mono<R>
R: 最终执行结果类型
ReactorAutoComposable<R>基于 Reactor 的异步编排接口,适用于非阻塞编程
CfAutoComposable<R>基于 CompletableFuture 的异步编排接口,内部也可以实现Sync接口,你只需要关注回调的业务逻辑处理。
LfAutoComposable<R>基于 ListenableFuture 的异步编排接口(Guava)
SyncAutoComposable<R>同步编程适配接口

接下来我们来看下具体实现。

2.1 制造车架、采购电池、制造引擎

我们创建类PurchaseBattery,代表采购电池

@Component
@AutoComposableBeanDesc("采购电池")
public class PurchaseBattery implements ReactorAutoComposable<String> {@Autowiredprivate RequestCarTypeHolder requestCarTypeHolder;@Overridepublic Mono<String> execute() {String carType = requestCarTypeHolder.get();switch (carType) {case "model3":return Mono.just(carType).delayElement(Duration.ofSeconds(1)).flatMap(s -> {String battery = "(" + s + ")的电池";// logSystem.out.printf("%s %s: 耗时1s,采购%s%n",LocalDateTime.now(),Thread.currentThread().getName(),battery);return Mono.just(battery);});case "modelY":// logSystem.out.printf("%s %s:(%s)的电池缺货%n",LocalDateTime.now(),Thread.currentThread().getName(),carType);return Mono.empty();default:// logSystem.out.printf("%s %s: 采购(%s)的电池失败%n",LocalDateTime.now(),Thread.currentThread().getName(),carType);return Mono.error(new IllegalStateException("PurchaseBattery error"));}}
}

@AutoComposableBeanDesc 注解用于描述组件的用途。这里我们接收外层参数,处理了三种车的电池采购情况:

  • model3:采购成功
  • modelY:采购失败(缺货)
  • modelS:采购失败(error)

我们创建类MakeCarFrame,代表制造车架

@Component
@AutoComposableBeanDesc("制造车架")
public class MakeCarFrame implements ReactorAutoComposable<String> {@Autowiredprivate RequestCarTypeHolder requestCarTypeHolder;@Overridepublic Mono<String> execute() {return Mono.just(requestCarTypeHolder.get()).delayElement(Duration.ofSeconds(1)).map(s -> {String carFrame = "(" + s + ")车架";// logSystem.out.printf("%s %s: 耗时1s,制造%s%n",LocalDateTime.now(),Thread.currentThread().getName(),carFrame);return carFrame;});}
}

我们创建类MakeEngine,代表制造引擎

@Component
@AutoComposableBeanDesc("制造引擎")
public class MakeEngine implements ReactorAutoComposable<String> {@Autowiredprivate PurchaseBattery purchaseBattery;@Overridepublic Mono<String> execute() {return Mono.justOrEmpty(purchaseBattery.getExecuteResult()).delayElement(Duration.ofSeconds(1)).map(battery -> {String engine = "适配(" + battery + ")的引擎";// logSystem.out.printf("%s %s: 耗时1s,采购电池后,制造%s%n",LocalDateTime.now(),Thread.currentThread().getName(),engine);return engine;});}
}

这里就体现了自动编排,制造引擎,需要先采购电池,那么编排上,只需要把PurchaseBattery类引入,就有两个效果:

  • 可以通过getExecuteResult() 函数,拿到前序组件的返回结果。
  • 框架会优先执行前序节点,再执行当前节点。

2.2 安装电池、电子设备、引擎

我们创建AssembleBattery类,代表安装电池:(它依赖于电池采购和车架安装)

@Component
@AutoComposableBeanDesc("安装电池")
public class AssembleBattery implements AssembleCarParts {@Autowiredprivate PurchaseBattery purchaseBattery;@Autowiredprivate MakeCarFrame makeCarFrame;@Overridepublic Boolean syncExecute() {if (purchaseBattery.getExecuteResult() == null || makeCarFrame.getExecuteResult() == null) {return false;}// logSystem.out.printf("%s %s: 安装%s至%s%n",LocalDateTime.now(),Thread.currentThread().getName(),purchaseBattery.getExecuteResult(),makeCarFrame.getExecuteResult());return true;}}

我们创建AssembleEngine类,代表安装电子设备:(它依赖于电池采购和车架安装)

@Component
@AutoComposableBeanDesc("安装电子设备")
public class AssembleAppliances implements AssembleCarParts {@Autowiredprivate PurchaseBattery purchaseBattery;@Autowiredprivate MakeCarFrame makeCarFrame;@Overridepublic Boolean syncExecute() {if (purchaseBattery.getExecuteResult() == null || makeCarFrame.getExecuteResult() == null) {return false;}// logSystem.out.printf("%s %s: 安装%s至%s%n",LocalDateTime.now(),Thread.currentThread().getName(),"适配(" + purchaseBattery.getExecuteResult() + ")的电子设备",makeCarFrame.getExecuteResult());return true;}
}

我们创建AssembleEngine类,代表安装引擎:(它依赖于制造引擎和车架安装)

@Component
@AutoComposableBeanDesc("安装引擎")
public class AssembleEngine implements AssembleCarParts {@Autowiredprivate MakeEngine makeEngine;@Autowiredprivate MakeCarFrame makeCarFrame;@Overridepublic Boolean syncExecute() {if (makeEngine.getExecuteResult() == null || makeCarFrame.getExecuteResult() == null) {return false;}// logSystem.out.printf("%s %s: 安装%s至%s%n",LocalDateTime.now(),Thread.currentThread().getName(),makeEngine.getExecuteResult(),makeCarFrame.getExecuteResult());return true;}}

2.3 汽车生产结束

我们创建MakeCarEnd类,代表汽车生产的结果。

@Component
@AutoComposableBeanDesc("生产汽车end")
public class MakeCarEnd implements ReactorAutoComposable.Sync<String> {@Autowiredprivate List<AssembleCarParts> assembleCarPartsList;@Autowiredprivate RequestCarTypeHolder requestCarTypeHolder;@Overridepublic String syncExecute() {String car = requestCarTypeHolder.get() +(assembleCarPartsList.stream().map(AssembleCarParts::getExecuteResult).allMatch(b -> Optional.ofNullable(b).orElse(false))? "生产成功": "生产失败");// logSystem.out.printf("%s %s: %s%n%n",LocalDateTime.now(),Thread.currentThread().getName(),car);return car;}
}

最后我们外部创建ControllerAutoComposeUtils类是执行编排的入口,入参是编排的最后一个返回节点。

@RestController
public class TestController {@Autowiredprivate AutoComposeUtils autoComposeUtils;@Autowiredprivate MakeCarEnd makeCarEnd;@Autowiredprivate RequestCarTypeHolder requestCarTypeHolder;@GetMapping(value = "", produces = MediaType.ALL_VALUE)public Mono<String> test(@RequestParam(defaultValue = "") String carType) {// 存储上下文数据到AutoCompose框架中requestCarTypeHolder.set(carType);// logSystem.out.printf("%n%s %s: 开始生产%s汽车%n",LocalDateTime.now(),Thread.currentThread().getName(),carType);// 执行自动编排,入参为根结点beanreturn autoComposeUtils.execute(makeCarEnd).onErrorResume(throwable -> {// logSystem.out.printf("%s %s: %s生产失败,原因:%s%n%n",LocalDateTime.now(),Thread.currentThread().getName(),carType,throwable.getMessage());return Mono.just(carType + "生产失败");}).subscribeOn(Schedulers.fromExecutor(ForkJoinPool.commonPool()));}
}

写一个过滤器,清空上下文(本框架基于TTL来实现缓存):底层实现类:TransmittableThreadLocalDataHolder

@Component
public static class AutoComposeFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {// 清除线程中缓存bean对象TransmittableThreadLocalDataHolder.reset();return chain.filter(exchange);}
}

写个启动类:

public static void main(String[] args) throws Exception {SpringApplication.run(AutoComposeApplication.class, args);System.setProperty("java.awt.headless", "false");// 控制台输出依赖关系AutoComposableDependenciesCache.INSTANCE.generateDependenciesString();// 生成依赖关系图片AutoComposableDependenciesCache.INSTANCE.generateDependenciesImg();Desktop.getDesktop().browse(new URI("http://127.0.0.1:8080/?carType=model3"));Desktop.getDesktop().browse(new URI("http://127.0.0.1:8080/?carType=modelY"));Desktop.getDesktop().browse(new URI("http://127.0.0.1:8080/?carType=modelS"));}

可以看到以下结果:

  • 各个组件的编排依赖关系。
  • 生产车辆的过程和最终结果。
--------AutoComposable implementation classes‘s dependencies--------
AssembleEngine(安装引擎) -> [MakeEngine, MakeCarFrame]
MakeCarFrame(制造车架) -> []
MakeEngine(制造引擎) -> [PurchaseBattery]
PurchaseBattery(采购电池) -> []
AssembleBattery(安装电池) -> [MakeCarFrame, PurchaseBattery]
AssembleAppliances(安装电子设备) -> [MakeCarFrame, PurchaseBattery]
MakeCarEnd(生产汽车end) -> [AssembleAppliances, AssembleEngine, AssembleBattery]AutoComposableDependencies.png generate successfully: /Users/jj.lin/Desktop/Project/auto-compose/auto-compose-example/target/classes/AutoComposableDependencies.png2025-05-16T13:39:02.627229 reactor-http-nio-2: 开始生产model3汽车2025-05-16T13:39:02.627229 reactor-http-nio-4: 开始生产modelY汽车2025-05-16T13:39:02.627232 reactor-http-nio-5: 开始生产modelS汽车
2025-05-16T13:39:02.643995 ForkJoinPool.commonPool-worker-2: 采购(modelS)的电池失败
2025-05-16T13:39:02.644095 ForkJoinPool.commonPool-worker-3:(modelY)的电池缺货
2025-05-16T13:39:02.644228 ForkJoinPool.commonPool-worker-2: modelS生产失败,原因:PurchaseBattery error2025-05-16T13:39:03.648769 parallel-1: 耗时1s,制造(model3)车架
2025-05-16T13:39:03.648905 parallel-4: 耗时1s,采购(model3)的电池
2025-05-16T13:39:03.648771 parallel-3: 耗时1s,制造(modelY)车架
2025-05-16T13:39:03.648769 parallel-2: 耗时1s,制造(modelS)车架
2025-05-16T13:39:03.649713 parallel-4: 安装适配((model3)的电池)的电子设备至(model3)车架
2025-05-16T13:39:03.652006 parallel-3: modelY生产失败2025-05-16T13:39:03.652022 parallel-4: 安装(model3)的电池至(model3)车架
2025-05-16T13:39:04.654848 parallel-5: 耗时1s,采购电池后,制造适配((model3)的电池)的引擎
2025-05-16T13:39:04.655460 parallel-5: 安装适配((model3)的电池)的引擎至(model3)车架
2025-05-16T13:39:04.655866 parallel-5: model3生产成功

⚙️ 三、运行机制

可以简单说一下整个AutoCompose的运行原理,它分为两个阶段。

1. 构建阶段:

  • 启动时自动扫描所有 @Component 中的 AutoComposable 组件
  • 分析组件间的依赖关系(通过 @Autowired 或注解标记)
  • 构建 DAG 图,并生成可视化描述

2. 执行阶段:

  • 从根组件开始,递归执行所有前置依赖组件
  • 使用后序遍历算法,保证被依赖组件先执行,依赖组件后执行
  • 每个组件的结果缓存到 ResultHolder

组件执行流程(伪代码)

execute(rootComponent):-> scan all dependent components-> build DAG graph-> check for cycle dependency-> post-order traverse & execute each component-> if cached: return cache result-> else: execute and cache result

📐 四、框架原理图和优势介绍

+---------------------+
|     HTTP 请求       |
+----------+----------+|v
+---------------------+
|   WebFilter 清空缓存 |
+----------+----------+|v
+---------------------+
|   AutoComposeUtils  |
|     .execute(...)   |
+----------+----------+|v
+---------------------+
| 依赖分析与排序       |
| - 构建 DAG 图       |
| - 检测循环依赖      |
+----------+----------+|v
+---------------------+
| 组件执行             |
| - 依赖组件先执行     |
| - 当前组件最后执行   |
| - 上下文隔离         |
| - 结果缓存           |
+----------+----------+|v
+---------------------+
| 返回聚合结果         |
+---------------------+

总而言之,AutoCompose 是一个基于 Spring + DAG 的自动编排执行框架,帮助开发者在微服务中以声明式方式组织异步任务链,实现依赖调度、上下文隔离、结果缓存和可视化展示。它有什么优势?

技术亮点

  • 灵活支持多种编程模型:同步、CompletableFuture、Reactor。ListenableFuture等。也可以自己拓展实现。
  • 自动依赖解析与拓扑排序:无需手动指定执行顺序。
  • 线程安全的上下文管理:使用TransmittableThreadLocal确保线程隔离。
  • 强大的可扩展性:支持自定义DataHolder实现,比如用Redis来作为编排组件的结果存储。
  • 可视化依赖分析:可生成依赖关系图和文本描述。

五、和开源框架LiteFlow框架有什么区别?

两款框架都是解决复杂服务开发的单事件驱动(无状态)的流程引擎。

LiteFlow的作者认为它是规则引擎,虽然有这种说法上的差异,但实际上,两者的功能、解决的问题是一样的。

AutoCompose追求简化开发提升效率,LiteFlow倾向于更灵活的的编排能力。

AutoComposeLiteFlow
学习成本低(熟悉几个接口与API)高(组件接口类型多,还要学习其EL表达式写法)
使用成本低(省去编排代码开发,自动组件数据传递,显著降低开发维护工作量)高(需要额外开发维护编排文件,需要硬编码数据传递)
编排灵活性低(只支持串行编排、并行编排)高(支持非常丰富灵活的编排能力)
异步编程支持(显著简化异步编程,降低门槛,提高开发效率)不支持
代码可读性高(逻辑都在代码里)低(必须结合代码与编排文件,才能确定实际执行逻辑)
逻辑可视化高(可生成业务流程图)中(可阅读编排文件)

AutoCompose原理现实际上并不复杂,代码数量特别少。下一篇文章会主要介绍AutoCompose的实现原理,

http://www.xdnf.cn/news/6872.html

相关文章:

  • 昇腾NPU环境搭建
  • FC7300 IO 无法正常输出高低电平问题排查
  • C++:赋值重载
  • 开放世界地形渲染:以三角洲行动为例(下篇)
  • 集星云推碰一碰:跨平台发布与源码定制全解析OEM源码部署
  • 携程token纯算分析
  • 【程序员AI入门:模型】19.开源模型工程化全攻略:从选型部署到高效集成,LangChain与One-API双剑合璧
  • 【匹配】Gotoh
  • RoboDual-上海交大-2025-2-6-开源
  • PCIe Switch 问题点
  • 【知识产权出版社-注册安全分析报告-无验证方式导致安全隐患】
  • 文章记单词 | 第82篇(六级)
  • 如何与“不安”和平共处?
  • 召回12:曝光过滤 Bloom Filter
  • 03算法学习_977、有序数组的平方
  • 经典案例 | 筑基与跃升:解码制造企业产供销协同难题
  • Go语言之路————并发
  • 【基础】Windows开发设置入门5:WinGet开发者完全指南(AI整理)
  • Spring 框架中适配器模式的五大典型应用场景
  • 轨道炮--范围得遍历,map巧统计
  • 强化学习算法实战:一个例子实现sarsa、dqn、ddqn、qac、a2c、trpo、ppo
  • RAGFlow升级到最新0.18.0新手指南
  • 【全解析】EN18031 标准下的 AUM 身份认证机制[上篇]
  • 国产三维CAD皇冠CAD(CrownCAD)建模教程:插接箱
  • B2C 商城转型指南:传统企业如何用 ZKmall模板商城实现电商化
  • 线上问题排查:JVM OOM问题如何排查和解决
  • Protobuf——Protocol Buffer详解(1)
  • RFID系统集成业务中,通过产业链上下游挖掘客户
  • Kubernetes + GlusterFS + Heketi 动态卷管理实践 !
  • 中大型水闸安全监测系统解决方案