当前位置: 首页 > news >正文

Spark中的堆外和堆内内存以及内部行数据表示UnsafeRow

背景

本文基于 Spark v4.0.0
写此文章的目的是为了说明 Spark计算引擎在堆内和堆外内存的处理,以及在内部行处理的优化。

分析

堆外和堆内内存

Spark中的内存分为执行内存和存储内存,
执行内存用来在用来缓存在shufle过程中的一些数据,比如说 shuffle 用来排序的内存。
存储内存用来缓存RDD数据以及broadcast的数据等。

具体的代码在 MemoryManager 中:

 private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {tungstenMemoryMode match {case MemoryMode.ON_HEAP => MemoryAllocator.HEAPcase MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE}}
...MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();MemoryAllocator HEAP = new HeapMemoryAllocator();
  • 执行内存中如何被使用
    ShuffleExternalSorter举例,在进行shuffle数据写入的时候,会经过如下数据流:
UnsafeShuffleWriter.write||\/
insertRecordIntoSorter||\/
ShuffleExternalSorter.insertRecord||\/
acquireNewPageIfNecessary||\/
allocatePage||\/
TaskMemoryManager.allocatePage||\/
memoryManager.tungstenMemoryAllocator().allocate(acquired)||\/
MemoryAllocator.allocate // UnsafeMemoryAllocator 或者 HeapMemoryAllocator
  • 存储内存如何被使用
    以缓存RDD的结果为例,经过的数据流如下:
 Executor.run||\/env.blockManager.putByte||\/BlockStoreUpdater.save()||\/saveSerializedValuesToMemoryStore||\/MemoryManager.putBytes||\/MemoryManager.acquireStorageMemory||\/storagePool.acquireMemory // StorageMemoryPool(this, MemoryMode.ON_HEAP)或者StorageMemoryPool(this, MemoryMode.OFF_HEAP)
  • 堆内堆外内存的分配和释放

UnsafeMemoryAllocator.allocate 的方法调用Unsafe.allocateMemory的方法,得到一个native的地址offeset:

 long address = Platform.allocateMemory(size);MemoryBlock memory = new MemoryBlock(null, address, size);

UnsafeMemoryAllocator.free直接调用 Platform.freeMemory(memory.offset);进行内存的释放:

Platform.freeMemory(memory.offset);

HeapMemoryAllocator.allocate的方法直接new一个Long类型的数组:

long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);

HeapMemoryAllocator.free的方法是通过释放java对象的引用来达到释放该内存对象的目的:

 memory.setObjAndOffset(null, 0);LinkedList<WeakReference<long[]>> pool =bufferPoolsBySize.computeIfAbsent(alignedSize, k -> new LinkedList<>());pool.add(new WeakReference<>(array));

并且通过弱引用来引用该对象,方便二次利用(如果该对象没有被gc释放的话)。

UnsafeRow

这个UnsafeRow是Spark InternalRow的一种实现,他的作用

  1. 是减少 JVM GC 的压力
  2. 将 JVM 对象序列化为字节数组进行存储,且不影响正常的数据操作,减少了数据存储内存,可以精确计算内存的使用情况
  3. 减少了 Shuffle 过程中序列化和反序列化的的消耗
  • 减少GC压力
    相比SpecificInternalRow / GenericInternalRow 都是以Array进行存储的的,而且Scala中所有的数据类型都是对象,没有java中原生类型的概念。
    比如说 Int final abstract class Int extends _root_.scala.AnyVal 其实是一个抽象类。
    这种采用了对象存储的方式,会天然存在GC的问题
    而 UnsafeRow 采用字节数组的形式,只有单个字节数组的对象,不像SpecificInternalRow 中的的每一个字段都是可以GC的对象,这样在GC标记可达对象阶段,可以减少时间

  • 不影响正常的数据操作,减少了数据存储内存,精确计算内存的使用情况
    BaseGenericInternalRow都有对应的 setXXXgetXXX方法,用来设置或者获取对应数据类型的值,
    UnsafeRow 也有 setXXXgetXXX方法,背后是对应Unsafe.putXXXUnsafe.getXXX方法,所以说不影响正常的数据操作
    正常的jvm对象在堆中会包含对象头,实例数据,对齐填充等信息,而unsafeRow只存储对应的数据本身,节约了额外信息
    由于UnsafeRow只存储了数据,对应的占用大小就可以直接计算出来; 而jvm对象不容易计算,因为对象中的字段可能还会包含其他的引用对象

  • 减少了 Shuffle 过程中序列化和反序列化的的消耗
    在Shuffle write写磁盘阶段,会将对应的Row数据序列化,对于是 UnsafeRow 这种序列化的话,直接把当前的byte数组的数据写入就行(参考UnsafeShuffleWriter.write方法实现),用的 ShuffleExchangeExec.serializer 的序列化 ,也就是 UnsafeRowSerializer
    而不像 JavaSerializer 这种序列化,还需要重新序列化整个对象

在shuffle 读阶段,会将数据反序列化为 内部Row, 对于 UnsafeRow 的话,直接读取对应的的字节数据到 UnsafeRow 就行了(参考 ShuffleManager.getReader 实现),用的是ShuffleExchangeExec.serializer.deserializeStream.asKeyValueIterator方法.

对于转换的问题可以参考SparkSQL InternalRow

http://www.xdnf.cn/news/1452151.html

相关文章:

  • S 3.3深度学习--卷积神经网络--代码
  • (A题|烟幕干扰弹的投放策略)2025年高教杯全国大学生数学建模国赛解题思路|完整代码论文集合
  • 【mmcv自己理解】
  • “全结构化录入+牙位可视化标记”人工智能化python编程路径探析
  • 新电脑硬盘如何分区?3个必知技巧避免“空间浪费症”!
  • 如何监控员工的电脑?7款实用的员工电脑管理软件,探索高效管理捷径!
  • cursor+python轻松实现电脑监控
  • 【嵌入式DIY实例-ESP32篇】-倾斜弹跳球游戏
  • 小程序缓存数据字典
  • Android 项目:画图白板APP开发(三)——笔锋(多 Path 叠加)
  • 当液态玻璃计划遭遇反叛者:一场 iOS 26 界面的暗战
  • 用 Rust + Actix-Web 打造“Hello, WebSocket!”——从握手到回声,只需 50 行代码
  • Energy期刊论文学习——基于集成学习模型的多源域迁移学习方法用于小样本实车数据锂离子电池SOC估计
  • 邮件如何防泄密?这10个电子邮件安全解决方案真的好用,快收藏
  • Windows+Docker一键部署CozeStudio私有化,保姆级
  • 15、Docker构建前端镜像并运行
  • 计算机大数据毕业设计推荐:基于Spark的新能源汽车保有量可视化分析系统
  • 配置阿里云 YUM 源指南
  • IPV6之DHCPv6服务器和中继代理和前缀代理服务器客户端
  • 高并发商城 商品为了防止超卖,都做了哪些努力?
  • PostgreSQL18-FDW连接的 SCRAM 直通身份验证
  • 当便捷遇上复杂,低代码的路该怎么走?
  • Linux 基础IO-从 “一切皆文件” 到自定义 libc 缓冲区
  • fastmcp2.0的传输方式
  • DFT:从RL的视角修正SFT损失的权重
  • 【高分论文密码】大尺度空间模拟预测与数字制图
  • Django事务
  • Leetcode 240. 搜索二维矩阵 II 矩阵 / 二分
  • 垃圾回收,几种GC算法及GC机制
  • 数据库中事务、指令、写法解读