当前位置: 首页 > ops >正文

23TaskExecutor初始化

TaskExecutor初始化流程梳理

1. 核心入口:onStart方法

TaskExecutor 继承自 RpcEndpoint,其初始化的核心逻辑始于 onStart 方法,主要负责启动 TaskExecutor 的核心服务并处理启动异常:

@Override
public void onStart() throws Exception {try {startTaskExecutorServices(); // 启动核心服务} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout(); // 启动注册超时机制
}

2. 核心服务启动:startTaskExecutorServices

private void startTaskExecutorServices() throws Exception {try {// 1. 连接ResourceManager并注册(通过ResourceManagerLeaderListener)resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// 2. 初始化任务槽表(TaskSlotTable),指定槽操作的实现类taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// 3. 启动JobLeader服务,用于跟踪JobManager的leaderjobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());// 4. 初始化文件缓存(FileCache)fileCache =new FileCache(taskManagerConfiguration.getTmpDirectories(),taskExecutorBlobService.getPermanentBlobService());// 5. 加载本地分配快照tryLoadLocalAllocationSnapshots();} catch (Exception e) {handleStartTaskExecutorServicesException(e);}
}

3. TaskManagerRunner:初始化启动器

TaskManagerRunner 是 TaskExecutor 的启动入口,通过匿名函数 TaskManagerRunner::createTaskExecutorService 创建并启动 TaskExecutor 服务:

public static int runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner;try {taskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService); // 匿名函数指定服务创建逻辑taskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}// 等待终止并返回退出码try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}
}
  • TaskManagerRunnerstartTaskManagerRunnerServices 方法会初始化 RPC 服务、高可用服务、 metrics 等基础组件,并最终调用匿名函数创建 taskExecutorService
private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {// 初始化RPC系统、线程池、高可用服务等rpcSystem = RpcSystem.load(configuration);executor = Executors.newScheduledThreadPool(...);highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...);// ... 省略其他基础组件初始化(metrics、Blob服务等)// 调用匿名函数创建TaskExecutor服务taskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}
}

4. TaskExecutor 实例化过程

createTaskExecutorService 方法通过 startTaskManager 实例化 TaskExecutor,并封装为 TaskExecutorService

public static TaskExecutorService createTaskExecutorService(...) throws Exception {final TaskExecutor taskExecutor =startTaskManager(...); // 实例化TaskExecutorreturn TaskExecutorToServiceAdapter.createFor(taskExecutor); // 适配为服务
}

startTaskManager 是实例化 TaskExecutor 的核心方法,主要完成:

  • 初始化资源配置(TaskExecutorResourceSpec
  • 构建任务管理器服务配置(TaskManagerServicesConfiguration
  • 初始化任务管理器服务(TaskManagerServices,包含 IO、Shuffle、任务槽等组件)
  • 最终创建 TaskExecutor 实例
public static TaskExecutor startTaskManager(...) throws Exception {// 1. 从配置中解析资源规格final TaskExecutorResourceSpec taskExecutorResourceSpec =TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);// 2. 构建服务配置TaskManagerServicesConfiguration taskManagerServicesConfiguration =TaskManagerServicesConfiguration.fromConfiguration(...);// 3. 初始化核心服务(包含TaskSlotTable、IO管理器、Shuffle环境等)TaskManagerServices taskManagerServices =TaskManagerServices.fromConfiguration(...);// 4. 构建TaskExecutor配置TaskManagerConfiguration taskManagerConfiguration =TaskManagerConfiguration.fromConfiguration(...);// 5. 实例化TaskExecutorreturn new TaskExecutor(rpcService,taskManagerConfiguration,highAvailabilityServices,taskManagerServices,// ... 其他参数);
}

5. 与 ResourceManager 的交互

TaskExecutor 启动后需与 ResourceManager 建立连接并注册,核心逻辑在 connectToResourceManager 方法:

private void connectToResourceManager() {// 1. 封装TaskExecutor注册信息(地址、资源ID、端口、资源规格等)final TaskExecutorRegistration taskExecutorRegistration =new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());// 2. 创建与ResourceManager的连接并启动注册resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();
}
  • 注册过程通过 TaskExecutorToResourceManagerConnectioninvokeRegistration 调用 ResourceManager 的 registerTaskExecutor 方法完成:
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManager,ResourceManagerId fencingToken,long timeoutMillis) throws Exception {Time timeout = Time.milliseconds(timeoutMillis);return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout); // 调用RM注册接口
}
  • 注册成功后,会通过 createNewRegistrationonRegistrationSuccess 方法调用 establishResourceManagerConnection
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection,TaskExecutorRegistrationSuccess success) {final ResourceID resourceManagerId = success.getResourceManagerId();final InstanceID taskExecutorRegistrationId = success.getRegistrationId();final ClusterInformation clusterInformation = success.getClusterInformation();final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();byte[] tokens = success.getInitialTokens();if (tokens != null) {try {log.info("Receive initial delegation tokens from resource manager");delegationTokenReceiverRepository.onNewTokensObtained(tokens);} catch (Throwable t) {log.error("Could not update delegation tokens.", t);ExceptionUtils.rethrowIfFatalError(t);}}runAsync(() -> {// 过滤掉过时的连接//noinspection ObjectEqualityif (resourceManagerConnection == connection) {try {establishResourceManagerConnection(resourceManagerGateway,resourceManagerId,taskExecutorRegistrationId,clusterInformation);} catch (Throwable t) {log.error("Establishing Resource Manager connection in Task Executor failed",t);}}});
}private void establishResourceManagerConnection(ResourceManagerGateway resourceManagerGateway,ResourceID resourceManagerResourceId,InstanceID taskExecutorRegistrationId,ClusterInformation clusterInformation) {// 这一步非常重要,向ResourceManager报告当前TaskExecutor拥有的slot数量final CompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(getResourceID(),taskExecutorRegistrationId,taskSlotTable.createSlotReport(getResourceID()),Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {if (throwable != null) {reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.",throwable));}},getMainThreadExecutor());// 监控ResourceManager作为心跳目标resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId,new ResourceManagerHeartbeatReceiver(resourceManagerGateway));// 设置传播的blob服务器地址final InetSocketAddress blobServerAddress =new InetSocketAddress(clusterInformation.getBlobServerHostname(),clusterInformation.getBlobServerPort());taskExecutorBlobService.setBlobServerAddress(blobServerAddress);establishedResourceManagerConnection =new EstablishedResourceManagerConnection(resourceManagerGateway,resourceManagerResourceId,taskExecutorRegistrationId);stopRegistrationTimeout();
}

establishResourceManagerConnection 方法主要完成:

  1. 向 ResourceManager 发送初始 slot 报告,告知自身可用的资源情况
  2. 建立与 ResourceManager 的心跳监控机制
  3. 配置 Blob 服务地址,用于后续的文件传输
  4. 保存已建立的连接信息并停止注册超时计时器
  • 注册成功后,会通过 createNewRegistrationonRegistrationSuccess 方法调用 establishResourceManagerConnection

6. 任务槽表(TaskSlotTable)初始化

TaskSlotTable 负责管理 TaskExecutor 的任务槽,其初始化在 TaskManagerServices 中完成:

public static TaskManagerServices fromConfiguration(...) throws Exception {// ... 其他服务初始化final TaskSlotTable<Task> taskSlotTable =createTaskSlotTable(taskManagerServicesConfiguration.getNumberOfSlots(), // 槽数量taskManagerServicesConfiguration.getTaskExecutorResourceSpec(), // 资源规格taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),taskManagerServicesConfiguration.getPageSize(),ioExecutor);// ... 封装到TaskManagerServices并返回
}

TaskSlotTablestartTaskExecutorServices 中被启动,并关联槽操作的实现类 SlotActionsImpl,用于处理槽的分配、释放等操作。

总结

TaskExecutor 的初始化流程可概括为:

  1. TaskManagerRunner 启动,通过匿名函数指定服务创建逻辑;
  2. 初始化基础组件(RPC、高可用、metrics 等);
  3. 调用 onStart 方法启动核心服务,包括连接 ResourceManager、初始化任务槽表等;
  4. 与 ResourceManager 建立连接并注册自身信息(资源、端口等);
  5. 完成初始化后,通过 TaskSlotTable 管理任务槽,准备接收任务分配。
http://www.xdnf.cn/news/18217.html

相关文章:

  • 【ansible】4.实施任务控制
  • AI 伦理的 “灰色地带”:当算法拥有决策权,公平与隐私该如何平衡?
  • 工地智能安全带让高空作业更安全
  • Kafka如何保证消费确认与顺序消费?
  • gcc 与 g++ 的区别:本身不是编译器而是编译器驱动
  • 数据库优化提速(一)之进销存库存管理—仙盟创梦IDE
  • 【Tech Arch】Apache Pig大数据处理的高效利器
  • 【JavaEE】多线程 -- 线程池
  • 基于单片机太阳能充电器/太阳能转换电能
  • 30. 技术专题-锁
  • HTTP的协议
  • .gitignore 文件 记录
  • Linux服务器性能优化总结
  • 【Tech Arch】Apache HBase分布式 NoSQL 数据库
  • redis---常用数据类型及内部编码
  • 如何低比特量化算法的工程实战与落地优化
  • 【考研408数据结构-08】 图论基础:存储结构与遍历算法
  • 让Chrome信任自签名证书
  • AI时代下阿里云基础设施的稳定性架构揭秘
  • C#海康SDK—热成像测温篇
  • gitlab、jenkins等应用集成ldap
  • package.json详细字段解释
  • 大数据技术栈 —— Redis与Kafka
  • JavaScript 性能优化实战:从分析到落地的全指南
  • 网络间的通用语言TCP/IP-网络中的通用规则4
  • Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
  • PyTorch自动求导
  • OpenHarmony之打造全场景智联基座的“分布式星链 ”WLAN子系统
  • Java试题-选择题(11)
  • Consul- acl机制!