当前位置: 首页 > news >正文

Spark内存管理

文章目录

  • 1.Spark内存消耗来源
  • 2.Spark内存管理模型
    • 2.1 数据缓存空间、框架执行空间以及用户代码空间
    • 2.2 Executor空间划分
      • 2.2.1 JVM堆内内存
      • 2.2.2 JVM堆外内存
  • 3.Spark框架执行内存消耗
    • 3.1 内存共享与竞争
    • 3.2 Shuffle Write
      • 3.2.1 ShuffleWrite分类
      • 3.2.2 无聚合,无排序,且分区数<=200
      • 3.2.3 无聚合,无排序,分区数>200
        • 3.2.3.1 分页的策略和优点
        • 3.2.3.2 Serialized Shuffle使用条件
        • 3.2.3.3 实现细节
        • 3.2.3.4 内存消耗
      • 3.2.4 不需要聚合,需要排序
      • 3.2.5 需要聚合,需要/不需要排序
    • 3.3 Shuffle Read
      • 3.3.1 ShuffleRead分类
      • 3.3.2 无聚合,无排序
      • 3.3.3 无聚合,需要排序
      • 3.3.4 有聚合
  • 4.数据缓存空间
    • 4.1 RDD缓存
      • 4.1.1 MEMORY_ONLY/MEMROY_AND_DISK
      • 4.1.2 MEMORY_ONLY_SER/MEMORY_AND_DISK_SER
      • 4.1.3 off-heap
    • 4.2 广播数据
    • 4.3 task计算结果

1.Spark内存消耗来源

  • 用户代码:比如在map操作中定义数组,字典等。
  • 框架:比如Shuffle阶段进行聚合排序的时候需要用到的数据结构。再比如不同rdd传输时产生的临时存储,比如record1->record1’之后将record1删除。
  • 缓存数据

2.Spark内存管理模型

2.1 数据缓存空间、框架执行空间以及用户代码空间

为了能够平衡用户代码、Shuffle机制中的中间数据,以及数据缓存的内存空间需求,最理想的方法是为三者分配一定的内存配额,并且在运行时根据三者的实际内存用量,动态调整配额比例。Spark将内存划分为三个区域:数据缓存空间、框架执行空间以及用户代码空间。
Framework memory:数据缓存空间和框架执行空间共享了一个大空间,称为Framework memory。Framework memory大小固定,且为数据缓存空间和框架执行空间设置了初始比例,但这个比例可以在应用执行过程中动态调整。

2.2 Executor空间划分

在这里插入图片描述

2.2.1 JVM堆内内存

Reserved Memoryspark.testing.ReservedMemory默认设置为300MB。系统保留内存使用较小的空间存储Spark框架产生的内部对象(如Spark Executor对象,TaskMemoryManager对象等Spark内部对象)。
UserMemory:默认约为40%的内存空间,用于存储用户代码生成的对象。
Framework Memory:包括框架执行空间(Execution Memory)和数据缓存空间(Storage Memory)。spark.memory.fraction总大小默认设为60%的内存空间。当数据缓存空间不足时,可以借用框架执行空间,后续框架执行空间不足时,需要归还该空间。当框架执行空间不足时,可以向数据缓存空间借用空间,但至少要保证数据缓存空间(spark.memory.storageFraction)具有Framework Memory的50%左右,在框架执行时借走的空间不会归还给数据缓存空间,原因是难以代码实现。

2.2.2 JVM堆外内存

  • 为了减少垃圾回收(GC)开销,Spark的统一内存管理机制也允许使用堆外内存。堆外内存类似使用C/C++语言分配的malloc空间,该空间不受JVM垃圾回收机制管理,在结束使用时需要手动释放空间。
  • 因为堆外内存主要存储序列化对象数据,而用户代码处理的是普通Java对象,因此堆外内存只用于框架执行空间和数据缓存空间,而不用于用户代码空间。
  • 如图,堆外内存大小使用过spark.memory.offHeap.size设置,Spark仍然会按照堆内内存使用的spark.memory.storageFraction比例将堆外内存分为框架执行空间和数据缓存空间,而且堆外内存的管理方式和功能与堆内内存的Framework Memory一样。
  • 在运行应用时,Spark会根据应用的Shuffle方式及用户设定的数据缓存级别来决定使用堆内内存还是堆外内存,如后面介绍SerializedShuffle方式可以利用堆外内存来进行Shuffle Write,再如用户使用rdd.persist(OFF_HEAP)后可以将rdd存储到堆外内存。

3.Spark框架执行内存消耗

3.1 内存共享与竞争

  • 一个Executor中存在多个task,假设一个Executor申请了cpu_nums个cpu,那么Executor中活跃的task数目就在[0, cpu_nums]之间。
  • 为了公平性,每个task所使用的空间被均分,一个task的空间大小被控制在[1/2N, 1/N]内,N为当前活跃的task数。
    在这里插入图片描述

3.2 Shuffle Write

3.2.1 ShuffleWrite分类

  • BypassMergeSortShuffle-Writer:无聚合,无排序,partition数不超过200。
  • Serialized ShuffleWriter:无聚合,无排序,partition数大于200。
  • SortShuffleWriter(KeyOrdering=true):无聚合,要排序。基于数组。
  • SortShuffleWriter(mapSideCombine=true):有聚合。基于哈希表。
    在这里插入图片描述
    其中1,3,4都是使用堆内内存来聚合,排序record对象的,属于Unserialized Shuffle方式。这种方式处理的record为普通的Java对象,有较大的内存开销,也会造成较大的JVM垃圾回收开销。
    Seriralized Shuffle处理序列化之后的数据,原理是直接在内存中操作序列化之后的对象,在ShuffleWrite中只用于无map端聚合,无排序,partition大于200的情况中。

3.2.2 无聚合,无排序,且分区数<=200

在这里插入图片描述
每个task内存消耗为,BufferSize(默认32KB)*partition number。

3.2.3 无聚合,无排序,分区数>200

在这里插入图片描述
当分区数量大于200时,不再为每个分区分配一个buffer,而是将计算出的KV放到一个大数组中,将数组中的record根据partitionId进行排序,然后输出即可。然后record是一个java对象,占用空间较大,并且会频繁GC。SerializedShuffle将对象序列化后放到可分页存储的数组中,序列化可以减少存储开销,分页可以利用碎片内存

3.2.3.1 分页的策略和优点

分页的策略和优点包括:

  • 序列化后的record占用的内存空间小。
  • 不需要连续的内存空间。
  • 排序效率高。对序列化后的record按partitionId进行排序时,排序的不是record本身,而是record序列化后字节数组的指针(元数据)。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程,内存和GC开销降低。
  • 可以使用cache-efficient sort等优化技术,提高排序性能。
  • 可以使用堆外内存,分页也可以方便统一管理堆内内存和堆外内存。
3.2.3.2 Serialized Shuffle使用条件
  • 无聚合和排序操作。
  • 使用的序列化类(serializer)支持序列化Value的位置互换功能(relocation of serialized Value),目前KryoSerializer和Spark SQL的custom serializers都支持该功能。
  • 分区个数小于16 777 216。
  • 单个Serialized record小于128MB。
3.2.3.3 实现细节

过Serliazed Shuffle采用了分页技术,像操作系统一样将内存空间划分为Page,每个Page大小在1MB~64MB,既可以在堆内内存上分配,也可以在堆外内存上分配。Page由Executor中的TaskMemoryManager对象来管理,TaskMemoryManager包含一个PageTable,可以最多寻址8192个Page。
如上图:
spark将输出的每个record进行序列化并放在申请的某一个page中,然后将该record的partitionId,所在的PageNum,以及Page中的offset放到PointerArray中,然后通过partitionId进行排序。当所有page的总大小超过了task的内存界限时,会将Page中的record按partition进行排序并spill到磁盘上。最后进行归并即可。
更具体的细节:

  • 对于新来的一个record,首先将其序列化后存到一个serBuffer中(1MB),然后将serBuffer中序列化的record放到ShuffleExternalSorter的Page中进行排序。
  • 插入和排序方法是,首先分配一个LongArray来保存record的指针,指针为64位,前24位存储record的partitionId,中间13位存储record所在的Page Num,后27位存储record在该Page中的偏移量。也就是说LongArray最多可以管理2 (13+27)=8192×128MB=1TB的内存。
  • 随着record不断地插入Page中,如果LongArray不够用或Page不够用,则会通过allocatePage()向TaskMemoryManager申请,如果申请不到,就启动spill()程序,将中间结果spill到磁盘上。
  • 最后再由UnsafeShuffleWriter进行统一的merge。Page由TaskMemoryManager管理和分配,可以存放在堆内内存或者堆外内存。
3.2.3.4 内存消耗

PointerArray、存储record的Page、sort算法所需的额外空间,总大小不超过task的内存限制。需要注意的是,单个数据结构(如PointerArray、serialized record)不能同时使用堆内内存和堆外内存,因此Serialized Shuffle使用堆外内存最大的问题是,在Shuffle Write时不能同时利用堆内内存和堆外内存,可能会造成更多的spill次数。

3.2.4 不需要聚合,需要排序

建立一个Array,根据partition+key进行排序。在这里插入图片描述
最大的内存消耗是存储record的数组PartitionedPairBuffer,占用堆内内存,具有扩容能力,但大小不超过task的内存限制。

3.2.5 需要聚合,需要/不需要排序

在这里插入图片描述
HashMap在堆内分配,需要消耗大量内存。如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。放入堆内HashMap或buffer中的record大小,如果超过task的内存限制,那么会spill到磁盘上。该Shuffle方式的优点是通用性强、对分区个数也无限制,缺点是内存消耗高(record是普通Java对象)、不能使用堆外内存。

3.3 Shuffle Read

3.3.1 ShuffleRead分类

在这里插入图片描述

  • 无聚合且无排序的情况:采用基于buffer获取数据并直接处理的方式,适用的典型操作如partitionBy()。
  • 无聚合但需要排序的情况:采用基于数组排序的方式,适用的典型操作如sortByKey()。
  • 有聚合的情况:采用基于HashMap聚合的方式,适用的典型操作如reduceByKey()。

这3种方式都是利用堆内内存来完成数据处理的,属于UnSerialized Shuffle方式。

3.3.2 无聚合,无排序

只包含一个大小为spark.reducer.maxSizeInFlight=48MB的缓冲区。

3.3.3 无聚合,需要排序

在这里插入图片描述
下游的task不断获取上游task输出的record,经过缓冲后,将record依次输出到一个Array结构(PartitionedPairBuffer)中。然后,对Array中的record按照Key进行排序,并将排序结果输出或者传递给下一步操作。

3.3.4 有聚合

在这里插入图片描述
由于Shuffle Read端获取的是各个上游task的输出数据,用于数据聚合的HashMap结构会消耗大量内存,而且只能使用堆内内存。当然,HashMap的内存消耗量也与record中不同Key的个数及聚合函数的复杂度相关。HashMap 具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合。

4.数据缓存空间

数据缓存空间只存放三种数据:rdd缓存数据,广播数据,以及task的计算结果。
另外,还有几种临时空间,如用于反序列化(展开iterator为Array[])的临时空间、用于存放Netty网络数据传输的临时空间等。
在这里插入图片描述
数据缓存空间也可以同时存放在堆内和堆外,而且由task共享。不同的是,每个task的存储空间并没有被限制为1/ N 。

4.1 RDD缓存

在这里插入图片描述

4.1.1 MEMORY_ONLY/MEMROY_AND_DISK

在这里插入图片描述

  • memoryStore持有一个链表(LinkedHashMap)来存储和管理缓存的RDD partition。在链表中,Key的形式是(rddId= m ,partitionId= n ),表示其Value存储的数据来自RDD m 的第 n 个分区。
  • Value是该partition的引用,引用指向一个名为DeserializedMemoryEntry的对象。该对象包含一个Vector,里面存放了partition中的record。
  • 由于缓存级别没有被设置为序列化存储,这些record以普通Java对象的方式存放在Vector中。一个Executor中可能同时运行多个task,因此,链表被多个task共用,即数据缓存空间由多个task共享。

数据缓存空间的内存消耗由存放到其中的RDD record大小决定,即等于所有task缓存的RDD partition的record总大小。

4.1.2 MEMORY_ONLY_SER/MEMORY_AND_DISK_SER

与MEMORY_ONLY的实现方式基本相同,唯一不同是,这里的partition中的record以序列化的方式存储在一个ChunkedByteBuffer(不连续的ByteBuffer数组)中。使用不连续的ByteBuffer数组的目的是方便分配和回收。因为如果record非常多,序列化后就需要一个非常大的数组来存储,而此时的内存空间如果没有连续的一大块空间,就无法存储。在之前的MEMORY_ONLY模式中不存在这个问题,因为单个普通Java对象可以存放在内存中的任意位置。
在这里插入图片描述
内存消耗:由存储的record总大小决定,即等于所有task缓存的RDD partition的record序列化后的总大小。

4.1.3 off-heap

该缓存模式的存储方式与MEMORY_ONLY_SER/MEMORY_AND_DISK_SER模式基本相同,需要缓存的partition中的record也是以序列化的方式存储在一个ChunkedByteBuffer(不连续的ByteBuffer数组)中的,只是存放位置是堆外内存。
在这里插入图片描述
内存消耗:存放到OFF-HEAP中的partition的原始大小。

4.2 广播数据

Broadcast默认使用类似BT下载的TorrentBroadcast方式。
在这里插入图片描述

  • 需要广播的数据一般预先存储在Driver端,Spark在Driver端将要广播的数据划分大小为spark.Broadcast.blockSize=4MB的数
    据块(block)。
  • 然后赋予每个数据块一个blockId为BroadcastblockId(id,“piece”+ i ),id表示block的编号,piece表示被划分后的第几个block。
  • 之后,使用类似BT的方式将每个block广播到每个Executor中。Executor接收到每个block数据块后,将其放到堆内的数据缓存空间的ChunkedByteBuffer里面,缓存模式为MEMORY_AND_DISK_SER。

内存消耗:序列化后的Broadcast block总大小。
内存不足:Broadcast data的存放方式是内存+磁盘,内存不足时放入磁盘。

4.3 task计算结果

许多应用需要在Driver端收集task的计算结果并进行处理,如调用了rdd.collect()的应用。当task的输出结果大小超过spark.task.maxDirectResultSize=1MB且小于1GB时,需要先将每个task的输出结果缓存到执行该task的Executor中,存放模式是MEMORY_AND_DISK_SER,然后Executor将task的输出结果发送到Driver端进一步处理。
在这里插入图片描述
内存消耗:序列化后的task输出结果大小,不超过1GB。在Executor中一般运行多个task,如果每个task都占用了1GB以上的话,则会引起Executor的数据缓存空间不足。
内存不足:因为缓存方式是内存+磁盘,所以在内存不足时放入磁盘。

http://www.xdnf.cn/news/1406971.html

相关文章:

  • Nginx 502 Bad Gateway:从 upstream 日志到 FastCGI 超时复盘
  • 腾讯浑元最新技术:具有表征对齐的多模态扩散,用于高保真拟音音频生成
  • 【嵌入式DIY实例】-空中鼠标
  • LeetCode算法日记 - Day 27: 计算右侧小于当前元素的个数、翻转对
  • 高校心理教育辅导系统的设计与实现|基于SpringBoot高校心理教育辅导系统的设计与实现
  • USB虚拟化应用5:VirtualFIDO2 虚拟硬件安全密钥,智能卡,yubico,支持X,FB,GITHUB等各种网站双重认证,让你的账户登录绝对安全
  • 在集群级别应用 Pod 安全标准
  • opencv 梯度提取
  • 数据化管理是什么意思?企业该如何进行数据化管理
  • 《SVA断言系统学习之路》【01】即时断言概览
  • 北京博乐科技有限公司2025届程序技术类笔试题
  • 性能测试工具-SkyWalking
  • 元宇宙与旅游产业:虚实融合的文旅新体验
  • Python毕业设计推荐:基于Django+MySQL的养老社区服务管理系统
  • 从 WPF 到 Avalonia 的迁移系列实战篇4:控件模板与 TemplatedControl
  • UniApp 基础开发第一步:HBuilderX 安装与环境配置
  • 【AI智能体技术】如何学习多智能体系统知识并实现SOTA算法?
  • SDL3.0 学习随笔:其一
  • 自底向上了解CPU的运算
  • 嵌入式常见架构
  • 【MYSQL】从混乱到清晰:联合查询帮你打通数据孤岛
  • 算法:插入排序
  • 公益免费二级域名
  • 解锁Tensor Core性能:深入探索CUDA Warp矩阵操作
  • Junior Engineer浅谈CAS
  • 【百度】C++开发(25届提前批 一面)面经
  • 时序数据库
  • GitHub 热榜项目 - 日榜(2025-08-31)
  • 使用cursor claude sonnet4的一些感受
  • PY32F002不小心设置了SWD复用的恢复