Apache SeaTunnel Spark引擎执行流程源码分析
目录
1. 任务启动入口
2. 任务执行命令类:SparkTaskExecuteCommand
3. SparkExecution的创建与初始化
3.1 核心组件初始化
3.2 关键对象说明
4. 任务执行:SparkExecution.execute()
5. Source处理流程
5.1 插件初始化
5.2 数据流生成
6. Transform处理流程
6.1 插件初始化
6.2 转换执行
7. Sink处理流程
7.1 插件初始化
7.2 数据输出
执行流程全景图
关键设计总结
本文基于SeaTunnel 2.3.x源码分析Spark引擎执行流程,以seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
为入口,完整解析Spark引擎的执行流程。
1. 任务启动入口
启动类核心代码:
public static void main(String[] args) { // 1. 创建Spark命令参数对象 SparkCommandArgs sparkCommandArgs = new SparkCommandArgs(); // 2. 执行SeaTunnel.run()回调Spark执行命令 SeaTunnel.run(sparkCommandArgs.buildCommand()); }
-
buildCommand()
返回SparkTaskExecuteCommand
实例 -
SeaTunnel.run()
最终调用SparkTaskExecuteCommand.execute()
2. 任务执行命令类:SparkTaskExecuteCommand
核心执行流程:
public void execute() { // 1. 解析配置文件生成Config对象 Config config = ConfigBuilder.of(configFile); // 2. 创建SparkExecution实例 SparkExecution seaTunnelTaskExecution = new SparkExecution(config); // 3. 执行任务 seaTunnelTaskExecution.execute(); }
3. SparkExecution的创建与初始化
3.1 核心组件初始化
public SparkExecution(Config config) { // 创建Spark运行时环境 this.sparkRuntimeEnvironment = SparkRuntimeEnvironment.getInstance(config); JobContext jobContext = new JobContext(); jobContext.setJobMode(RuntimeEnvironment.getJobMode(config)); // 创建三大处理器 this.sourcePluginExecuteProcessor = new SourceExecuteProcessor( sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SOURCE)); this.transformPluginExecuteProcessor = new TransformExecuteProcessor( sparkRuntimeEnvironment, jobContext, TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList())); this.sinkPluginExecuteProcessor = new SinkExecuteProcessor( sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SINK)); }
3.2 关键对象说明
组件 | 类型 | 功能 |
---|---|---|
sourcePluginExecuteProcessor | SourceExecuteProcessor | 处理数据源接入 |
transformPluginExecuteProcessor | TransformExecuteProcessor | 处理数据转换逻辑 |
sinkPluginExecuteProcessor | SinkExecuteProcessor | 处理数据输出 |
sparkRuntimeEnvironment | SparkRuntimeEnvironment | 封装SparkSession及运行时环境 |
4. 任务执行:SparkExecution.execute()
DAG构建流程:
public void execute() throws TaskExecuteException { // 初始化数据集集合 List<Dataset<Row>> datasets = new ArrayList<>(); // 按顺序执行三大组件 datasets = sourcePluginExecuteProcessor.execute(datasets); datasets = transformPluginExecuteProcessor.execute(datasets); sinkPluginExecuteProcessor.execute(datasets); log.info(&