Spark中的堆外和堆内内存以及内部行数据表示UnsafeRow
背景
本文基于 Spark v4.0.0
写此文章的目的是为了说明 Spark计算引擎在堆内和堆外内存的处理,以及在内部行处理的优化。
分析
堆外和堆内内存
Spark中的内存分为执行内存和存储内存,
执行内存用来在用来缓存在shufle过程中的一些数据,比如说 shuffle 用来排序的内存。
存储内存用来缓存RDD数据以及broadcast的数据等。
具体的代码在 MemoryManager 中:
private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {tungstenMemoryMode match {case MemoryMode.ON_HEAP => MemoryAllocator.HEAPcase MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE}}
...MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();MemoryAllocator HEAP = new HeapMemoryAllocator();
- 执行内存中如何被使用
以ShuffleExternalSorter
举例,在进行shuffle数据写入的时候,会经过如下数据流:
UnsafeShuffleWriter.write||\/
insertRecordIntoSorter||\/
ShuffleExternalSorter.insertRecord||\/
acquireNewPageIfNecessary||\/
allocatePage||\/
TaskMemoryManager.allocatePage||\/
memoryManager.tungstenMemoryAllocator().allocate(acquired)||\/
MemoryAllocator.allocate // UnsafeMemoryAllocator 或者 HeapMemoryAllocator
- 存储内存如何被使用
以缓存RDD的结果为例,经过的数据流如下:
Executor.run||\/env.blockManager.putByte||\/BlockStoreUpdater.save()||\/saveSerializedValuesToMemoryStore||\/MemoryManager.putBytes||\/MemoryManager.acquireStorageMemory||\/storagePool.acquireMemory // StorageMemoryPool(this, MemoryMode.ON_HEAP)或者StorageMemoryPool(this, MemoryMode.OFF_HEAP)
- 堆内堆外内存的分配和释放
UnsafeMemoryAllocator.allocate 的方法调用Unsafe.allocateMemory
的方法,得到一个native的地址offeset:
long address = Platform.allocateMemory(size);MemoryBlock memory = new MemoryBlock(null, address, size);
UnsafeMemoryAllocator.free直接调用 Platform.freeMemory(memory.offset);
进行内存的释放:
Platform.freeMemory(memory.offset);
HeapMemoryAllocator.allocate的方法直接new一个Long类型的数组:
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
HeapMemoryAllocator.free的方法是通过释放java对象的引用来达到释放该内存对象的目的:
memory.setObjAndOffset(null, 0);LinkedList<WeakReference<long[]>> pool =bufferPoolsBySize.computeIfAbsent(alignedSize, k -> new LinkedList<>());pool.add(new WeakReference<>(array));
并且通过弱引用来引用该对象,方便二次利用(如果该对象没有被gc释放的话)。
UnsafeRow
这个UnsafeRow是Spark InternalRow的一种实现,他的作用
- 是减少 JVM GC 的压力
- 将 JVM 对象序列化为字节数组进行存储,且不影响正常的数据操作,减少了数据存储内存,可以精确计算内存的使用情况
- 减少了 Shuffle 过程中序列化和反序列化的的消耗
-
减少GC压力
相比SpecificInternalRow
/GenericInternalRow
都是以Array进行存储的的,而且Scala中所有的数据类型都是对象,没有java中原生类型的概念。
比如说 Intfinal abstract class Int extends _root_.scala.AnyVal
其实是一个抽象类。
这种采用了对象存储的方式,会天然存在GC的问题
而 UnsafeRow 采用字节数组的形式,只有单个字节数组的对象,不像SpecificInternalRow 中的的每一个字段都是可以GC的对象,这样在GC标记可达对象阶段,可以减少时间 -
不影响正常的数据操作,减少了数据存储内存,精确计算内存的使用情况
像BaseGenericInternalRow
都有对应的setXXX
和getXXX
方法,用来设置或者获取对应数据类型的值,
UnsafeRow 也有setXXX
和getXXX
方法,背后是对应Unsafe.putXXX
和Unsafe.getXXX
方法,所以说不影响正常的数据操作
正常的jvm对象在堆中会包含对象头,实例数据,对齐填充等信息,而unsafeRow只存储对应的数据本身,节约了额外信息
由于UnsafeRow只存储了数据,对应的占用大小就可以直接计算出来; 而jvm对象不容易计算,因为对象中的字段可能还会包含其他的引用对象 -
减少了 Shuffle 过程中序列化和反序列化的的消耗
在Shuffle write写磁盘阶段,会将对应的Row数据序列化,对于是 UnsafeRow 这种序列化的话,直接把当前的byte数组的数据写入就行(参考UnsafeShuffleWriter.write方法实现),用的ShuffleExchangeExec.serializer
的序列化 ,也就是UnsafeRowSerializer
而不像 JavaSerializer 这种序列化,还需要重新序列化整个对象
在shuffle 读阶段,会将数据反序列化为 内部Row, 对于 UnsafeRow 的话,直接读取对应的的字节数据到 UnsafeRow 就行了(参考 ShuffleManager.getReader 实现),用的是ShuffleExchangeExec.serializer.deserializeStream.asKeyValueIterator
方法.
对于转换的问题可以参考SparkSQL InternalRow