23TaskExecutor初始化
TaskExecutor
初始化流程梳理
1. 核心入口:onStart
方法
TaskExecutor
继承自 RpcEndpoint
,其初始化的核心逻辑始于 onStart
方法,主要负责启动 TaskExecutor 的核心服务并处理启动异常:
@Override
public void onStart() throws Exception {try {startTaskExecutorServices(); // 启动核心服务} catch (Throwable t) {final TaskManagerException exception =new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout(); // 启动注册超时机制
}
2. 核心服务启动:startTaskExecutorServices
private void startTaskExecutorServices() throws Exception {try {// 1. 连接ResourceManager并注册(通过ResourceManagerLeaderListener)resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// 2. 初始化任务槽表(TaskSlotTable),指定槽操作的实现类taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// 3. 启动JobLeader服务,用于跟踪JobManager的leaderjobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());// 4. 初始化文件缓存(FileCache)fileCache =new FileCache(taskManagerConfiguration.getTmpDirectories(),taskExecutorBlobService.getPermanentBlobService());// 5. 加载本地分配快照tryLoadLocalAllocationSnapshots();} catch (Exception e) {handleStartTaskExecutorServicesException(e);}
}
3. TaskManagerRunner
:初始化启动器
TaskManagerRunner
是 TaskExecutor 的启动入口,通过匿名函数 TaskManagerRunner::createTaskExecutorService
创建并启动 TaskExecutor 服务:
public static int runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner;try {taskManagerRunner =new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService); // 匿名函数指定服务创建逻辑taskManagerRunner.start();} catch (Exception exception) {throw new FlinkException("Failed to start the TaskManagerRunner.", exception);}// 等待终止并返回退出码try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.",ExceptionUtils.stripExecutionException(t));}
}
TaskManagerRunner
的startTaskManagerRunnerServices
方法会初始化 RPC 服务、高可用服务、 metrics 等基础组件,并最终调用匿名函数创建taskExecutorService
:
private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {// 初始化RPC系统、线程池、高可用服务等rpcSystem = RpcSystem.load(configuration);executor = Executors.newScheduledThreadPool(...);highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...);// ... 省略其他基础组件初始化(metrics、Blob服务等)// 调用匿名函数创建TaskExecutor服务taskExecutorService =taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}
}
4. TaskExecutor
实例化过程
createTaskExecutorService
方法通过 startTaskManager
实例化 TaskExecutor
,并封装为 TaskExecutorService
:
public static TaskExecutorService createTaskExecutorService(...) throws Exception {final TaskExecutor taskExecutor =startTaskManager(...); // 实例化TaskExecutorreturn TaskExecutorToServiceAdapter.createFor(taskExecutor); // 适配为服务
}
startTaskManager
是实例化 TaskExecutor
的核心方法,主要完成:
- 初始化资源配置(
TaskExecutorResourceSpec
) - 构建任务管理器服务配置(
TaskManagerServicesConfiguration
) - 初始化任务管理器服务(
TaskManagerServices
,包含 IO、Shuffle、任务槽等组件) - 最终创建
TaskExecutor
实例
public static TaskExecutor startTaskManager(...) throws Exception {// 1. 从配置中解析资源规格final TaskExecutorResourceSpec taskExecutorResourceSpec =TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);// 2. 构建服务配置TaskManagerServicesConfiguration taskManagerServicesConfiguration =TaskManagerServicesConfiguration.fromConfiguration(...);// 3. 初始化核心服务(包含TaskSlotTable、IO管理器、Shuffle环境等)TaskManagerServices taskManagerServices =TaskManagerServices.fromConfiguration(...);// 4. 构建TaskExecutor配置TaskManagerConfiguration taskManagerConfiguration =TaskManagerConfiguration.fromConfiguration(...);// 5. 实例化TaskExecutorreturn new TaskExecutor(rpcService,taskManagerConfiguration,highAvailabilityServices,taskManagerServices,// ... 其他参数);
}
5. 与 ResourceManager 的交互
TaskExecutor
启动后需与 ResourceManager 建立连接并注册,核心逻辑在 connectToResourceManager
方法:
private void connectToResourceManager() {// 1. 封装TaskExecutor注册信息(地址、资源ID、端口、资源规格等)final TaskExecutorRegistration taskExecutorRegistration =new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());// 2. 创建与ResourceManager的连接并启动注册resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();
}
- 注册过程通过
TaskExecutorToResourceManagerConnection
的invokeRegistration
调用 ResourceManager 的registerTaskExecutor
方法完成:
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManager,ResourceManagerId fencingToken,long timeoutMillis) throws Exception {Time timeout = Time.milliseconds(timeoutMillis);return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout); // 调用RM注册接口
}
- 注册成功后,会通过
createNewRegistration
的onRegistrationSuccess
方法调用establishResourceManagerConnection
:
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection,TaskExecutorRegistrationSuccess success) {final ResourceID resourceManagerId = success.getResourceManagerId();final InstanceID taskExecutorRegistrationId = success.getRegistrationId();final ClusterInformation clusterInformation = success.getClusterInformation();final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();byte[] tokens = success.getInitialTokens();if (tokens != null) {try {log.info("Receive initial delegation tokens from resource manager");delegationTokenReceiverRepository.onNewTokensObtained(tokens);} catch (Throwable t) {log.error("Could not update delegation tokens.", t);ExceptionUtils.rethrowIfFatalError(t);}}runAsync(() -> {// 过滤掉过时的连接//noinspection ObjectEqualityif (resourceManagerConnection == connection) {try {establishResourceManagerConnection(resourceManagerGateway,resourceManagerId,taskExecutorRegistrationId,clusterInformation);} catch (Throwable t) {log.error("Establishing Resource Manager connection in Task Executor failed",t);}}});
}private void establishResourceManagerConnection(ResourceManagerGateway resourceManagerGateway,ResourceID resourceManagerResourceId,InstanceID taskExecutorRegistrationId,ClusterInformation clusterInformation) {// 这一步非常重要,向ResourceManager报告当前TaskExecutor拥有的slot数量final CompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(getResourceID(),taskExecutorRegistrationId,taskSlotTable.createSlotReport(getResourceID()),Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));slotReportResponseFuture.whenCompleteAsync((acknowledge, throwable) -> {if (throwable != null) {reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.",throwable));}},getMainThreadExecutor());// 监控ResourceManager作为心跳目标resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId,new ResourceManagerHeartbeatReceiver(resourceManagerGateway));// 设置传播的blob服务器地址final InetSocketAddress blobServerAddress =new InetSocketAddress(clusterInformation.getBlobServerHostname(),clusterInformation.getBlobServerPort());taskExecutorBlobService.setBlobServerAddress(blobServerAddress);establishedResourceManagerConnection =new EstablishedResourceManagerConnection(resourceManagerGateway,resourceManagerResourceId,taskExecutorRegistrationId);stopRegistrationTimeout();
}
establishResourceManagerConnection
方法主要完成:
- 向 ResourceManager 发送初始 slot 报告,告知自身可用的资源情况
- 建立与 ResourceManager 的心跳监控机制
- 配置 Blob 服务地址,用于后续的文件传输
- 保存已建立的连接信息并停止注册超时计时器
- 注册成功后,会通过
createNewRegistration
的onRegistrationSuccess
方法调用establishResourceManagerConnection
:
6. 任务槽表(TaskSlotTable
)初始化
TaskSlotTable
负责管理 TaskExecutor 的任务槽,其初始化在 TaskManagerServices
中完成:
public static TaskManagerServices fromConfiguration(...) throws Exception {// ... 其他服务初始化final TaskSlotTable<Task> taskSlotTable =createTaskSlotTable(taskManagerServicesConfiguration.getNumberOfSlots(), // 槽数量taskManagerServicesConfiguration.getTaskExecutorResourceSpec(), // 资源规格taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),taskManagerServicesConfiguration.getPageSize(),ioExecutor);// ... 封装到TaskManagerServices并返回
}
TaskSlotTable
在 startTaskExecutorServices
中被启动,并关联槽操作的实现类 SlotActionsImpl
,用于处理槽的分配、释放等操作。
总结
TaskExecutor
的初始化流程可概括为:
- 由
TaskManagerRunner
启动,通过匿名函数指定服务创建逻辑; - 初始化基础组件(RPC、高可用、metrics 等);
- 调用
onStart
方法启动核心服务,包括连接 ResourceManager、初始化任务槽表等; - 与 ResourceManager 建立连接并注册自身信息(资源、端口等);
- 完成初始化后,通过
TaskSlotTable
管理任务槽,准备接收任务分配。