当前位置: 首页 > news >正文

Spark 运行流程核心组件(三)任务执行

一、启动模式

1、standalone

在这里插入图片描述

  1. 资源申请:Driver向Master申请Executor资源
  2. Executor启动:Master调度Worker启动Executor
  3. 注册通信:Executor直接向Driver注册

2、YARN

在这里插入图片描述

  1. Driver向YARN ResourceManager(RM)申请AM容器

  2. RM分配NodeManager(NM)启动AM(yarn-client 仅资源代理,不运行用户代码)

  3. AM向RM注册

  4. AM根据申请Executor容器

5.RM分配多个NM

6.每个NM启动ExecutorBackend进程

**7.**注册通信:Executor向AM内的Driver注册

二、Executor端任务执行的核心组件

  1. Driver 端组件
    • CoarseGrainedSchedulerBackend:负责与Executor通信
    • TaskSchedulerImpl:任务调度核心逻辑
    • DAGScheduler:DAG调度与Stage管理
    • BlockManagerMaster:块管理器协调器
    • MapOutputTrackerMaster:Shuffle输出跟踪器
  2. Executor 端组件
    • CoarseGrainedExecutorBackend:Executor的通信端点
    • Executor:任务执行引擎
    • TaskRunner:任务执行线程封装
    • BlockManager:本地数据块管理
    • ShuffleManager:Shuffle读写控制
    • ExecutorSource:指标监控

三、Executor 端任务执行核心流程

1、任务接收与初始化

  • CoarseGrainedExecutorBackend 接收任务
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}")executor.launchTask(this, taskDesc)}
  • Executor 任务启动
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val taskId = taskDescription.taskIdval tr = createTaskRunner(context, taskDescription)runningTasks.put(taskId, tr)val killMark = killMarks.get(taskId)if (killMark != null) {tr.kill(killMark._1, killMark._2)killMarks.remove(taskId)}threadPool.execute(tr)if (decommissioned) {log.error(s"Launching a task while in decommissioned state.")}}

2、任务执行

  • TaskRunner.run
// 1. 类加载与依赖管理
updateDependencies(taskDescription.artifacts.files,taskDescription.artifacts.jars,taskDescription.artifacts.archives,isolatedSession)
// 2. 反序列化任务
task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 3. 内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
task.setTaskMemoryManager(taskMemoryManager)// 4. 任务执行
val value = Utils.tryWithSafeFinally {val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem,cpus = taskDescription.cpus,resources = resources,plugins = plugins)threwException = falseres} {// block 释放val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)// memory 释放val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0 && !threwException) {val errMsg = log"Managed memory leak detected; size = " +log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logInfo(errMsg)}}}// 5. 状态上报
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  • Task run
// hadoop.caller.context.enabled = true
// 添加 HDFS 审计日志 , 用于问题排查 。 
// e.g. 小文件剧增 定位spark 任务
new CallerContext("TASK",SparkEnv.get.conf.get(APP_CALLER_CONTEXT),appId,appAttemptId,jobId,Option(stageId),Option(stageAttemptId),Option(taskAttemptId),Option(attemptNumber)).setCurrentContext()// 任务启动
context.runTaskWithListeners(this)

3、shuffle处理

  • ShuffleMapTask

为下游 Stage 准备 Shuffle 数据(Map 端输出),生成 MapStatus(包含数据位置和大小信息)。

val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 从广播 序列化 rdd 、 dep
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0Lval rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {partitionId
} else {context.taskAttemptId()
}
dep.shuffleWriterProcessor.write(rdd.iterator(partition, context),dep,mapId,partitionId,context)
  • ResultTask
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lfunc(context, rdd.iterator(partition, context))
}

四、核心通信机制

消息类型方向内容
RegisterExecutorExecutor→DriverexecutorId, hostPort
RegisteredExecutorDriver→Executor注册成功确认
LaunchTaskDriver→Executor序列化的TaskDescription
StatusUpdateExecutor→DrivertaskId, state, result data
KillTaskDriver→Executor终止指定任务
StopExecutorDriver→Executor关闭Executor指令
HeartbeatExecutor→Driver心跳+指标数据

五、Executor 线程模型

Executor JVM Process
├── CoarseGrainedExecutorBackend (netty)
├── ThreadPool (CacheThreadPool)
│   ├── TaskRunner 1
│   ├── TaskRunner 2
│   └── ... 
├── BlockManager
│   ├── MemoryStore (on-heap/off-heap)
│   └── DiskStore
└── ShuffleManager├── SortShuffleWriter└── UnsafeShuffleWriter
http://www.xdnf.cn/news/1322929.html

相关文章:

  • 【lucene】tip文件详解
  • 08.常见文本处理工具
  • 基于Spring Boot+Vue的社区便民服务平台 智慧社区平台 志愿者服务管理
  • 咨询进阶——解读咨询顾问技能模型
  • QT 字节大小端转序方法
  • axure chrome 浏览器插件的使用
  • kafka的pull的依据
  • 关系型数据库与非关系型数据库
  • 冒泡排序——简单理解和使用
  • 嵌入式第三十一天(线程间的机制,IPC机制)
  • JAVA经典面试题:数据库调优
  • rust 从入门到精通之变量和常量
  • 从 ORA-12703 到顺利入库:Go + Oracle 11g GBK 字符集踩坑记20250818
  • [免费]基于Python的全国气象数据采集及可视化大屏系统(Flask+request库)【论文+源码+SQL脚本】
  • elasticsearch-集成prometheus监控(k8s)
  • 【LeetCode题解】LeetCode 74. 搜索二维矩阵
  • 【深度长文】Anthropic发布Prompt Engineering全新指南
  • IDE开发系列(2)扩展的IDE框架设计
  • 【音视频】瑞芯微、全志芯片在运动相机和行车记录仪产品分析
  • mybatis连接数据库
  • Kafka 零拷贝(Zero-Copy)技术详解
  • 数据赋能(401)——大数据——持续学习与优化原则
  • RAG 入门指南:从概念到最小系统搭建
  • 基于Android的随身小管家APP的设计与实现/基于SSM框架的财务管理系统/android Studio/java/原生开发
  • 从0-1使用Fastmcp开发一个MCP服务,并部署到阿里云百炼 -持续更新中
  • Flutter 自定义 Switch 切换组件完全指南
  • 深度学习——R-CNN及其变体
  • React diff——差异协调算法简介
  • 【Python面试题】写一个用元类(metaclass)实现API接口自动注册的Demo。以及装饰器在项目中典型应用场景。
  • AI行业应用深度报告:金融、医疗、教育、制造业落地案例