当前位置: 首页 > news >正文

DolphinScheduler 3.2.0 Worker启动核心源码解析

目录

一、概览

二、Worker 启动入口 run()

三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

3.2 Netty RPC Client(发送结果)

四、加载 Task 插件

五、注册中心客户端启动

六、Worker 管理线程启动

七、消息重试线程启动

八、RPC 请求到任务执行的完整链路

九、总结


一、概览

在 DolphinScheduler 中,Worker 负责从 Master 接收调度命令、执行具体的 Task,并将执行结果通过消息回传给 Master。Worker 在 JVM 中启动后,需要依次完成以下关键组件的初始化和启动:

  • Netty RPC 服务端(workerRpcServer)和客户端(workerRpcClient

  • 任务插件管理器(taskPluginManager

  • 注册中心客户端(workerRegistryClient

  • Worker 管理线程(调度任务分发)

  • 消息重试线程(保证消息可靠性)

最终,Worker 进入“消息接收 → 任务封装 → 入队等待 → 线程池执行 → 结果回传”的循环流程,持续响应 Master 的分发请求并执行任务。


二、Worker 启动入口 run()

Worker 的入口方法标记为 @PostConstruct,即在 Spring 容器完成 Bean 注入后自动调用。其源码如下:

@PostConstruct
public void run() {// 1. 启动 rpc 服务this.workerRpcServer.start();this.workerRpcClient.start();// 2. 加载插件this.taskPluginManager.loadPlugin();// 3. 启动注册中心客户端this.workerRegistryClient.setRegistryStoppable(this);this.workerRegistryClient.start();// 4. 启动管理线程this.workerManagerThread.start();// 5. 启动消息重试线程this.messageRetryRunner.start();
}

第1步:启动 RPC 服务(服务端 + 客户端),用于接收 Master 下发的分发消息以及向 Master 发送执行结果。 – 第2步:通过 SPI 加载 User 自定义或内置的 TaskChannelFactory,实现对不同 Task 类型的支持。 – 第3步:启动注册中心客户端,将自己注册到 Zookeeper(或其他注册中心)上,以便 Master 可发现并下发任务。 – 第4步:启动 Worker 管理线程,用于不断从等待队列取出任务并提交给线程池执行。 – 第5步:启动消息重试线程,负责对由于网络抖动或 Master 未及时 ACK 而需要重试发送的消息进行重发。


三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

public void start() {NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig();serverConfig.setListenPort(workerConfig.getListenPort());nettyRemotingServer = new NettyRemotingServer(serverConfig);for (WorkerRpcProcessor processor : workerRpcProcessors) {nettyRemotingServer.registerProcessor(processor);}this.nettyRemotingServer.start();
}
  1. 读取配置:从 workerConfig 中获取 RPC 服务端配置,包括端口、线程池大小等。

  2. 构造 NettyRemotingServer:底层基于 Netty 封装的服务器类,用于接收网络消息。

  3. 注册 Processor:通过 nettyRemotingServer.registerProcessor 将各类消息处理器(如 WorkerTaskDispatchProcessor)绑定到不同的消息类型上。

  4. 启动服务器:调用 Netty 的 bind(port).sync(),启动 BossGroup 和 WorkerGroup,监听客户端连接。

底层实现重点在 NettyRemotingServer.start(),核心伪码如下:

if (isStarted.compareAndSet(false, true)) {serverBootstrap.group(bossGroup, workGroup).channel(getServerSocketChannelClass()).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {initNettyChannel(ch);}});serverBootstrap.bind(listenPort).sync();
}
  • BossGroup 负责接收连接,WorkerGroup 负责处理 I/O 读写。

  • 通过 ChannelInitializer 向 Pipeline 中注入解码器、编码器、心跳检测等 Handler。

3.2 Netty RPC Client(发送结果)

public void start() {this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig());for (WorkerRpcProcessor processor : workerRpcProcessors) {this.nettyRemotingClient.registerProcessor(processor);}
}

NettyRemotingClient 构造时会初始化:

  • EventLoopGroup(Epoll 或 NIO)

  • Callback Executor:处理响应回调

  • Response Future Executor:扫描超时的未回包请求

其启动逻辑:

bootstrap.group(workerGroup).channel(getSocketChannelClass()).option(TCP_NODELAY, clientConfig.isTcpNoDelay()).handler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(...)).addLast(new NettyDecoder(), clientHandler, encoder);}});
responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS);
isStarted.set(true);
  • 连接池与重连策略:客户端在发送消息时会根据连接状态自动重连或重建 Channel。

  • 心跳机制:通过 IdleStateHandler 定期向 Master 发送心跳,保持连接活跃。

  • 超时重试ResponseFuture 定时扫描未收到回包的请求,触发超时失败或重试。


四、加载 Task 插件

Worker 支持多种 Task 类型(Shell、Spark、MapReduce……),这些逻辑都通过 SPI 插件化管理:

public void loadPlugin() {PrioritySPIFactory<TaskChannelFactory> factoryLoader= new PrioritySPIFactory<>(TaskChannelFactory.class);for (Map.Entry<String, TaskChannelFactory> entry : factoryLoader.getSPIMap().entrySet()) {String name = entry.getKey();TaskChannelFactory factory = entry.getValue();taskChannelFactoryMap.put(name, factory);taskChannelMap.put(name, factory.create());}
}
  1. PrioritySPIFactory 利用 ServiceLoader.load(spiClass) 扫描 classpath 下所有 META-INF/services/... 配置的实现。

  2. 冲突处理:若多个插件使用同一标识(t.getIdentify().getName()),则通过 resolveConflict 按优先级或版本选择。

  3. 实例化 Factory 并调用 create(),生成具体 TaskChannel,用于执行阶段构建执行命令、日志采集等。

插件加载完成后,Worker 便可根据 Task 类型动态路由到对应的执行实现。


五、注册中心客户端启动

Worker 需要向统一的注册中心(如 Zookeeper)注册自己的可用性信息,Master 才能发现并调度任务给它。

public void start() {registry();registryClient.addConnectionStateListener(new WorkerConnectionStateListener(workerConfig, strategy));
}
private void registry() {WorkerHeartBeat hb = workerHeartBeatTask.getHeartBeat();String path = workerConfig.getWorkerRegistryPath();registryClient.remove(path);registryClient.persistEphemeral(path, JSONUtils.toJsonString(hb));workerHeartBeatTask.start();
}
  • persistEphemeral:将自身地址、可用线程数等写入 ${registryRoot}/workers/${workerAddress},并由 ZK 管理生命周期。

  • 心跳定时任务workerHeartBeatTask.start() 定时刷新节点数据/TTL,保证长连接下的可用性信息及时更新。

  • 连接监听:若与注册中心断开,WorkerConnectionStateListener 可触发重连或优雅退出。


六、Worker 管理线程启动

Worker 启动后,用一个独立线程不断从“等待提交队列”中取出任务并提交给线程池。

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (!ServerLifeCycleManager.isRunning()) {Thread.sleep(SLEEP_TIME);}if (getThreadPoolQueueSize() <= workerExecThreads) {WorkerDelayTaskExecuteRunnable task = waitSubmitQueue.take();workerExecService.submit(task);} else {incOverloadCount();Thread.sleep(SLEEP_TIME);}}
}
  • waitSubmitQueue:阻塞队列,存放待执行的 WorkerDelayTaskExecuteRunnable 实例。

  • 背压机制:当线程池队列已满(> workerExecThreads)时,先统计过载指标,再稍后重试。

  • workerExecService:底层是带监听能力的 ListeningExecutorService,执行完毕后可注册回调处理日志和结果汇总。


七、消息重试线程启动

Worker 执行过程中需要向 Master 回传任务执行结果、日志位点、心跳等消息。若网络抖动导致消息发送失败,需重试:

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (needToRetryMessages.isEmpty()) {Thread.sleep(MESSAGE_RETRY_WINDOW);}long now = System.currentTimeMillis();for (entry in needToRetryMessages) {for (message in entry.getValue()) {if (now - message.getSendTime() > RETRY_WINDOW) {message.setSendTime(now);messageSenderMap.get(type).sendMessage(message);}}}Thread.sleep(SLEEP_TIME);}
}
  • needToRetryMessages:按 TaskInstanceId 分类的待重试消息列表。

  • 定时扫描:以 MESSAGE_RETRY_WINDOW(如 30 秒)为周期,判断消息是否超时,若超时则重新调用对应的 messageSender 去向 Master 派发。

  • 幂等与日志:每次重试都会刷新发送时间,并输出重试日志,保证 Master 端能最终收到消息或记录重试失败。


八、RPC 请求到任务执行的完整链路

  1. Master 通过 RPC 调用 WorkerRpcClient.sendMessageTaskDispatchRequest 发往指定 Worker。

  2. Worker Netty Server 的 NettyServerHandler.channelRead() 接收消息,调用 processReceived()

    pair = processors.get(msg.getType());
    pair.getRight().submit(() -> processor.process(channel, msg));
  3. WorkerTaskDispatchProcessor.process() 反序列化 TaskDispatchRequest,构建 TaskExecutionContext 并缓存。

  4. 检查是否需要延时执行(delayTime);若需延时,则先发送延时执行消息给 Master,跳过入队。

  5. 通过 WorkerTaskExecuteRunnableFactoryBuilder 构造 WorkerDelayTaskExecuteRunnable,并调用:

    if (!workerManager.offer(runnable)) {sendDispatchRejectResult();
    } else {sendDispatchSuccessResult();
    }
  6. workerManager.offer() 根据满载策略将 Runnable 放入 waitSubmitQueue,等待管理线程消费。

  7. 管理线程取出后提交给线程池执行,进入 WorkerDelayTaskExecuteRunnable.run(),真正执行 TaskChannel 的 execute()

  8. Task 执行过程中通过 workerRpcClient 向 Master 发送日志、状态更新等消息;若发送失败,加入 needToRetryMessages 由重试线程处理。

  9. Task 完成后,最终调用 messageSenderMap.get(TASK_EXECUTE_RESULT).sendMessage(resultMessage),Master 收到后更新实例状态。


九、总结

DolphinScheduler Worker 的启动流程,实际上是一套“轻量级调度节点”从初始化、服务注册、插件加载,到任务接收、入队执行、结果回传以及消息可靠性保障的完整闭环。

  • 可插拔:SPI 机制加载 TaskChannel 工厂,支持按需扩展。

  • 高并发:Netty + 线程池,实现主从分离、异步解耦。

  • 可靠性:注册中心心跳、RPC 心跳、消息重试,保证节点可用性和消息可靠交互。

  • 可监控:丰富的度量指标(队列长度、过载次数、重试次数),有助于运维监控和自动扩缩容。

http://www.xdnf.cn/news/1094167.html

相关文章:

  • C/C++ 高频八股文面试题1000题(二)
  • EPLAN 电气制图(六):结构盒与设备管理器核心概念(基础知识选看)
  • Shader面试题100道之(41-60)
  • 【视频观看系统】- 技术与架构选型
  • 家庭网络中的服务器怎么对外提供服务?
  • NumPy-广播机制深入理解
  • 技术开发栈中 URL地址末尾加不加 “/“ 有什么区别?
  • Vue 中mounted 生命周期钩子的执行时机和 v-for 的渲染顺序
  • Mysql中的日志-undo/redo/binlog详解
  • Hexo + Butterfly + Vercel 完整个人Blog部署指南
  • 17.Spring Boot的Bean详解(新手版)
  • TCP的可靠传输机制
  • 正点原子学习 用户权限管理
  • 汽车工业制造领域与数字孪生技术的关联性研究​
  • Python数据分析案例|从模拟数据到可视化:零售门店客流量差异分析全流程
  • 17-C#封装,继承,多态与重载
  • PyTorch数据准备:从基础Dataset到高效DataLoader
  • Hadoop(一)
  • 操作系统核心技术剖析:从Android驱动模型到鸿蒙微内核的国产化实践
  • C++随机打乱函数:简化源码与原理深度剖析
  • 3 STM32单片机-delay延时驱动
  • (八)PS识别:使用 Python 自动化生成图像PS数据集
  • 智慧物流管理:动作识别与包装检测的协同突破
  • Python标准库:时间与随机数全解析
  • 方差、协方差和协方差矩阵
  • TCP/IP常用协议
  • Dify升级到1.5.1详细操作步骤,规避和RAGFlow的镜像冲突问题
  • 神经网络基础及API使用详解
  • 零知开源——STM32F407VET6驱动SHT41温湿度传感器完整教程
  • Linux的 `test`命令(或等价中括号写法 `[空格expression空格]`)的用法详解. 笔记250709