当前位置: 首页 > ds >正文

Spark 运行流程核心组件(二)任务调度

1、调度策略

参数默认值说明
spark.scheduler.modeFIFO调度策略(FIFO/FAIR)
spark.locality.wait3s本地性降级等待时间
spark.locality.wait.processspark.locality.waitPROCESS_LOCAL 等待时间
spark.locality.wait.nodespark.locality.waitNODE_LOCAL 等待时间
spark.locality.wait.rackspark.locality.waitRACK_LOCAL 等待时间

调度策略:FIFO 按提交顺序处理;FAIR 支持权重分配。

本地化策略: 从 PROCESS_LOCAL 到 ANY 逐级降级,减少数据传输开销

val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value性能最优-> 最差
PROCESS_LOCAL -> NODE_LOCAL -> NO_PREF -> RACK_LOCAL -> ANY1、PROCESS_LOCAL
任务所需的数据就在同一个 Executor 进程的内存中2、NODE_LOCAL
任务所需的数据在同一个物理节点上,但不在同一个 Executor 进程的内存中。3、NO_PREF
没有位置偏好,数据来源本身是均匀分布或位置无关。4、RACK_LOCAL
不在同一个节点上,但在同一个机架5、ANY
在集群的其他机架

2、核心组件

1、Schedulable

可调度实体的接口定义

1、Pool

调度树中的节点,可以是根节点或中间节点

  • 调度模式(FIFO或FAIR)
  • 权重(weight)
  • 最小资源份额(minShare)
  • 运行任务数(runningTasks)
  • 子节点列表(schedulableQueue)

2、TaskSetManager

管理一个TaskSet(一组任务),负责任务调度、本地性处理、失败重试和推测执行

  • 跟踪任务状态(待运行、运行中、已完成)
  • 根据数据本地性选择任务
  • 处理任务失败和重试逻辑
  • 实现推测执行机制

2、TaskScheduler

Spark任务调度的核心实现,协调资源分配和任务调度

  • 接收DAGScheduler提交的TaskSet
  • 管理Executor资源状态
  • 分配任务到可用Executor
  • 处理任务状态更新
  • 实现调度延迟和推测执行策略

3、SchedulableBuilder

调度树构建的抽象基类

1、FIFOSchedulableBuilder

实现先进先出调度策略的构建器

2、FairSchedulableBuilder

实现公平调度策略的构建器

3、核心设计

1、调度树

在这里插入图片描述

  • Spark Standalone支持树形结构的调度池,每个池可以独立配置调度模式(FIFO/FAIR)
特性FIFO调度公平调度
结构单层Pool结构多层Pool树结构
排序方式按提交顺序基于权重/minShare
资源分配独占式按比例共享
适用场景批处理作业多用户/多作业环境
配置方式无需配置XML配置文件定义Pool
  • 作业通过sc.setLocalProperty("spark.scheduler.pool", "poolName")分配到指定的调度池
  • 配置调度池配置文件
<!-- conf/fairscheduler.xml -->
<?xml version="1.0"?>
<allocations><pool name="production"><schedulingMode>FAIR</schedulingMode><weight>1</weight><minShare>2</minShare></pool><pool name="test"><schedulingMode>FIFO</schedulingMode><weight>2</weight><minShare>3</minShare></pool>
</allocations>

1、FIFO

    // 优先级val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageId// 先进先出res = math.signum(stageId1 - stageId2)}res < 0

2、FAIR

override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasks// 1. 满足minShare优先级val s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2// 2. 比较资源使用比例val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)// 3. 权重比较val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDoublevar compare = 0if (s1Needy && !s2Needy) {return true} else if (!s1Needy && s2Needy) {return false} else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}}

2、资源分配与任务调度

  • Executor注册
executorAdded(o.executorId, o.host)
  • Pool排序调度队列
val sortedTaskSets = rootPool.getSortedTaskSetQueue
  • TaskSetManager资源分配请求
val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus,availableResources, tasks)val (taskDescOption, didReject, index) =taskSet.resourceOffer(execId, host, maxLocality, taskCpus, taskResAssignments)
  • TaskSetManager本地性匹配
var allowedLocality = maxLocalityif (maxLocality != TaskLocality.NO_PREF) {allowedLocality = getAllowedLocalityLevel(curTime)if (allowedLocality > maxLocality) {// We're not allowed to search for farther-away tasksallowedLocality = maxLocality}}
http://www.xdnf.cn/news/18020.html

相关文章:

  • EN/IEC 55015 照明设备的电磁兼容标准安全
  • Docker Compose部署Clickhouse最新版
  • 【LINUX网络】HTTP协议基本结构、搭建自己的HTTP简单服务器
  • 为什么游戏会出现“卡顿”:`clock.tick()` v.s. `clock.get_fps()`
  • 【uni-app】根据角色/身份切换显示不同的 自定义 tabbar
  • 线性代数 · 直观理解矩阵 | 空间变换 / 特征值 / 特征向量
  • CERT/CC警告:新型HTTP/2漏洞“MadeYouReset“恐致全球服务器遭DDoS攻击瘫痪
  • 机械加工元件——工业精密制造的璀璨明珠
  • Day14: Flask太空站搭建指南:从零到超光速的Web开发之旅
  • git clone https://gh.llkk.cc/
  • C++从入门到实战(十九)C++ vector容器及其常用接口
  • 电子电路学习日记
  • qt项目中解决关闭弹窗后执行主界面的信号槽时闪退问题
  • MySql——聚簇索引(主键索引)和非聚簇索索引(非主键索引)引区别(即聚集索引和非聚集索引区别)
  • Java 学习笔记(基础篇2)
  • Docker build创建镜像命令入门教程
  • **超融合架构中的发散创新:探索现代编程语言的挑战与机遇**一、引言随着数字化时代的快速发展,超融合架构已成为IT领域的一种重要趋势
  • ts概念讲解
  • Vue 3 + TypeScript:package.json 示例 / 详细注释说明
  • 基于Java飞算AI的Spring Boot聊天室系统全流程实战
  • 快速部署一个鉴黄服务
  • 前端vue框架
  • 【机器人-开发工具】ROS 2 (4)Jetson Nano 系统Ubuntu22.04安装ROS 2 Humble版本
  • 【Java 后端】Spring Boot 集成 JPA 全攻略
  • Nginx学习笔记(九)—— Nginx Rewrite深度解析
  • 版本更新!FairGuard-Mac加固工具已上线!
  • win11右键菜单改回win10样式
  • Data Augmentation数据增强
  • EtherCAT概念介绍
  • EchoEar喵伴接入小聆AI,MCP服务轻松体验,智能升级!