当前位置: 首页 > ai >正文

24.JobGraph 的生成与提交流程解析

JobGraph 的生成与提交流程解析

什么是 JobGraph

JobGraph 是 Flink 中 StreamGraph 进一步转换生成的逻辑执行计划,主要负责将 StreamGraph 中的算子进行合并优化,但尚未形成最终的物理执行计划(物理执行计划由 JobMaster 负责生成)。本文将详细解析从 StreamGraph 到 JobGraph 的转换过程及 JobGraph 的提交机制。

StreamingJobGraphGenerator:StreamGraph 到 JobGraph 的转换器

StreamingJobGraphGenerator是专门用于将StreamGraph转换为JobGraph的核心类,其核心入口方法如下:

public static JobGraph createJobGraph(ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) {// 为每个作业创建一个线程池用于序列化操作final ExecutorService serializationExecutor =Executors.newFixedThreadPool(Math.max(1,Math.min(Hardware.getNumberCPUCores(),streamGraph.getExecutionConfig().getParallelism())),new ExecutorThreadFactory("flink-operator-serialization-io"));try {return new StreamingJobGraphGenerator(userClassLoader, streamGraph, jobID, serializationExecutor).createJobGraph();} finally {serializationExecutor.shutdown();}
}

createJobGraph () 方法详解

createJobGraph()方法是转换过程的核心实现,主要包含以下关键步骤:

private JobGraph createJobGraph() {preValidate();jobGraph.setJobType(streamGraph.getJobType());jobGraph.setDynamic(streamGraph.isDynamic());jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());// Generate deterministic hashes for the nodes in order to identify them across// submission iff they didn't change.Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// Generate legacy version hashes for backwards compatibilityList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}setChaining(hashes, legacyHashes);if (jobGraph.isDynamic()) {setVertexParallelismsForDynamicGraphIfNecessary();}// Note that we set all the non-chainable outputs configuration here because the// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job// vertices and partition-reusefinal Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =new HashMap<>();setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);setPhysicalEdges();markSupportingConcurrentExecutionAttempts();validateHybridShuffleExecuteInBatchMode();setSlotSharingAndCoLocation();setManagedMemoryFraction(Collections.unmodifiableMap(jobVertices),Collections.unmodifiableMap(vertexConfigs),Collections.unmodifiableMap(chainedConfigs),id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());configureCheckpointing();jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =JobGraphUtils.prepareUserArtifactEntries(streamGraph.getUserArtifacts().stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1)),jobGraph.getJobID());for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :distributedCacheEntries.entrySet()) {jobGraph.addUserArtifact(entry.getKey(), entry.getValue());}// set the ExecutionConfig last when it has been finalizedtry {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());} catch (IOException e) {throw new IllegalConfigurationException("Could not serialize the ExecutionConfig."+ "This indicates that non-serializable types (like custom serializers) were registered");}jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());addVertexIndexPrefixInVertexName();setVertexDescription();// Wait for the serialization of operator coordinators and stream config.try {FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();} catch (Exception e) {throw new FlinkRuntimeException("Error in serialization.", e);}if (!streamGraph.getJobStatusHooks().isEmpty()) {jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());}return jobGraph;}
  1. 生成算子哈希 ID
    为 StreamGraph 中的算子生成确定性哈希,用于跨提交识别未变更的算子

    Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// 生成遗留版本的哈希以保证向后兼容性
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }
    
  2. 算子链化(Chaining)
    将 StreamGraph 中的 Operator 连接形成算子链,优化执行效率

    setChaining(hashes, legacyHashes);
    
  3. 动态图并行度设置(如需要)

    if (jobGraph.isDynamic()) {setVertexParallelismsForDynamicGraphIfNecessary();
    }
    
  4. 输出配置与物理边设置

    final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =new HashMap<>();// 设置非链式输出配置
    setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);
    setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);// 设置物理连接边
    setPhysicalEdges();
    
  5. 其他关键配置

    markSupportingConcurrentExecutionAttempts();
    validateHybridShuffleExecuteInBatchMode();
    setSlotSharingAndCoLocation();
    setManagedMemoryFraction(...);
    configureCheckpointing();
    jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
    
  6. 分布式缓存设置

    final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =JobGraphUtils.prepareUserArtifactEntries(streamGraph.getUserArtifacts().stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1)),jobGraph.getJobID());for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :distributedCacheEntries.entrySet()) {jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
    }
    
  7. 执行配置与序列化

    jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());// 等待算子协调器和流配置的序列化完成
    FutureUtils.combineAll(vertexConfigs.values().stream().map(config ->config.triggerSerializationAndReturnFuture(serializationExecutor)).collect(Collectors.toList())).get();waitForSerializationFuturesAndUpdateJobVertices();
    

JobGraph 的提交过程

AbstractSessionClusterExecutor:会话集群执行器

生成的 JobGraph 通过AbstractSessionClusterExecutor客户端进行提交:

public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull final ClassLoader userCodeClassloader)throws Exception {// 将Pipeline转换为JobGraphfinal JobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);try (final ClusterDescriptor<ClusterID> clusterDescriptor =clusterClientFactory.createClusterDescriptor(configuration)) {final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);checkState(clusterID != null);final ClusterClientProvider<ClusterID> clusterClientProvider =clusterDescriptor.retrieve(clusterID);ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();return clusterClient.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {ClientUtils.waitUntilJobInitializationFinished(() -> clusterClient.getJobStatus(jobId).get(),() -> clusterClient.requestJobResult(jobId).get(),userCodeClassloader);return jobId;})).thenApplyAsync(jobID ->(JobClient)new ClusterClientJobClientAdapter<>(clusterClientProvider,jobID,userCodeClassloader)).whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());}
}

RestClusterClient:REST 客户端提交实现

RestClusterClient负责将客户端的 JobGraph 和相关 jar 包提交到集群,集群中的JobSubmitHandler类负责接收这些数据:

@Override
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {// 序列化JobGraph到临时文件CompletableFuture<java.nio.file.Path> jobGraphFileFuture =CompletableFuture.supplyAsync(() -> {try {final java.nio.file.Path jobGraphFile =Files.createTempFile("flink-jobgraph-" + jobGraph.getJobID(), ".bin");try (ObjectOutputStream objectOut =new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {objectOut.writeObject(jobGraph);}return jobGraphFile;} catch (IOException e) {throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));}},executorService);// 准备提交请求CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture =jobGraphFileFuture.thenApply(jobGraphFile -> {List<String> jarFileNames = new ArrayList<>(8);List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames =new ArrayList<>(8);Collection<FileUpload> filesToUpload = new ArrayList<>(8);filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));// 添加用户JAR包for (Path jar : jobGraph.getUserJars()) {jarFileNames.add(jar.getName());filesToUpload.add(new FileUpload(Paths.get(jar.toUri()),RestConstants.CONTENT_TYPE_JAR));}// 添加分布式缓存文件for (Map.Entry<String, DistributedCache.DistributedCacheEntry>artifacts : jobGraph.getUserArtifacts().entrySet()) {final Path artifactFilePath =new Path(artifacts.getValue().filePath);try {// 仅上传本地 artifactsif (!artifactFilePath.getFileSystem().isDistributedFS()) {artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(),artifactFilePath.getName()));filesToUpload.add(new FileUpload(Paths.get(artifactFilePath.getPath()),RestConstants.CONTENT_TYPE_BINARY));}} catch (IOException e) {throw new CompletionException(new FlinkException("Failed to get the FileSystem of artifact "+ artifactFilePath+ ".",e));}}final JobSubmitRequestBody requestBody =new JobSubmitRequestBody(jobGraphFile.getFileName().toString(),jarFileNames,artifactFileNames);return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));});// 发送提交请求final CompletableFuture<JobSubmitResponseBody> submissionFuture =requestFuture.thenCompose(requestAndFileUploads -> {LOG.info("Submitting job '{}' ({}).",jobGraph.getName(),jobGraph.getJobID());return sendRetriableRequest(JobSubmitHeaders.getInstance(),EmptyMessageParameters.getInstance(),requestAndFileUploads.f0,requestAndFileUploads.f1,isConnectionProblemOrServiceUnavailable(),(receiver, error) -> {if (error != null) {LOG.warn("Attempt to submit job '{}' ({}) to '{}' has failed.",jobGraph.getName(),jobGraph.getJobID(),receiver,error);} else {LOG.info("Successfully submitted job '{}' ({}) to '{}'.",jobGraph.getName(),jobGraph.getJobID(),receiver);}});});// 清理临时文件submissionFuture.exceptionally(ignored -> null) // 忽略错误.thenCompose(ignored -> jobGraphFileFuture).thenAccept(jobGraphFile -> {try {Files.delete(jobGraphFile);} catch (IOException e) {LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);}});return submissionFuture.thenApply(ignore -> jobGraph.getJobID()).exceptionally((Throwable throwable) -> {throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(),"Failed to submit JobGraph.",ExceptionUtils.stripCompletionException(throwable)));});
}

总结

JobGraph 作为 StreamGraph 与物理执行计划之间的中间表示,在 Flink 作业执行流程中扮演着重要角色。其核心功能是对算子进行链化优化,为后续的物理执行计划生成奠定基础。

JobGraph 的生成主要由StreamingJobGraphGenerator完成,而提交过程则通过AbstractSessionClusterExecutorRestClusterClient等组件实现,最终由集群的JobSubmitHandler接收处理。

JobGraph 的setChaining方法和intermediateData数据结构与其内部实现密切相关,而 JobMaster 会基于 JobGraph 构建真正的物理执行计划,其中中间输出过程与 NIO 和数据分区机制相关联。理解 JobGraph 的生成与提交流程,有助于深入掌握 Flink 作业的执行原理。

http://www.xdnf.cn/news/18616.html

相关文章:

  • 阿里发布Qoder:颠覆软件开发体验的AI编程平台
  • [机械结构设计-32]:机械加工中,3D图评审OK,没有问题,后续的主要风险有哪些
  • MRO and mixin in Python Django
  • 单片机外设(七)RTC时间获取
  • 七日杀 单机+联机 送修改器(7 Days to Die)免安装中文版
  • 复杂姿态误报率↓78%!陌讯多模态算法在跌倒检测的医疗落地
  • Windows版Cyberfox下载及替代浏览器推荐
  • Goang开源库之go-circuitbreaker
  • Highcharts推出OEM许可证中国区正式上线:赋能企业级嵌入式数据可视化解决方案
  • 2025.8.18-2025.8.24第34周:有内耗有挣扎
  • STM32低功耗模式
  • kafka基本思路即概念
  • 大数据管理与应用系列丛书《数据挖掘》读书笔记之集成学习(1)
  • 胸部X光片数据集:健康及肺炎2类,14k+图像
  • 牛市阶段投资指南
  • ROS 与 Ubuntu 版本对应关系
  • ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘192.168.24.96‘ (10060)
  • 【嵌入式】【搜集】状态机、状态迁移图及状态模式材料
  • VSCode远程开发实战:SSH连接服务器详解(附仙宫云平台示例)
  • Ubuntu24.04环境下causal_conv1d和mamba_ssm安装
  • 深度剖析Spring AI源码(七):化繁为简,Spring Boot自动配置的实现之秘
  • Linux应急响应一般思路(一)
  • 设计模式:建造者模式
  • 【ansible】5.在受管主机部署文件和Jinja2模板
  • 嵌入式八股文面试题总结(QT、RTOS、Linux、ARM、C/C++)(持续更新)
  • 在Excel和WPS表格中打印时加上行号和列标
  • 【Unity开发】Unity核心学习(二)
  • 超级助理:百度智能云发布的AI助理应用
  • 2025年渗透测试面试题总结-30(题目+回答)
  • 【从零开始学习Redis】如何设计一个秒杀业务