Spark缓存--cache方法
在Spark 中,cache()
是用于优化计算性能的核心方法之一,但它有许多细节需要深入理解。以下是关于 cache()
的详细技术解析:
1. cache()
的本质
-
简化的
persist()
:cache()
是persist(StorageLevel.MEMORY_ONLY)
的快捷方式,将数据以反序列化对象的形式存储在内存中。 -
惰性操作:调用
cache()
后,数据不会立即缓存,只有在首次触发行动操作(如count()
,show()
,collect()
)时才会执行缓存。 -
存储级别:默认使用
MEMORY_ONLY
,若内存不足,未缓存的分区会在后续需要时重新计算。
2. 底层工作原理
缓存过程
-
血缘(Lineage)记录:Spark 记录 RDD/DataFrame 的血缘关系(即生成该数据的操作步骤)。
-
首次计算:当首次触发行动操作时,Spark 根据血缘执行计算,并将结果按分区缓存在内存中。
-
后续复用:后续操作直接读取缓存数据,跳过血缘中的计算步骤。
缓存失效
-
手动释放:调用
unpersist()
立即释放缓存。 -
自动清理:Spark 根据 LRU(最近最少使用)策略自动清理缓存,当内存不足时,最早未使用的缓存分区会被移除。
3. 存储级别的关键细节
cache()
对应的 MEMORY_ONLY
存储级别特性:
特性 | 说明 |
---|---|
序列化 | 数据以反序列化 Java 对象形式存储,读写速度快,但内存占用高。 |
内存溢出处理 | 内存不足时,直接丢弃未缓存的分区,后续需要时重新计算(不会写入磁盘)。 |
容错性 | 缓存数据丢失时(如节点故障),Spark 根据血缘重新计算。 |
4. 何时使用 cache()
?
适用场景
-
重复使用:同一数据集被多次用于不同操作(如多阶段机器学习流水线)。
-
迭代计算:如 PageRank、梯度下降等需要多次遍历数据的算法。
-
交互式分析:在 Spark Shell 中多次查询同一数据集。
不适用场景
-
单次使用:数据仅用一次时,缓存反而浪费资源。
-
内存不足:数据远大于可用内存时,
MEMORY_ONLY
会导致频繁重计算,应改用MEMORY_AND_DISK
。
代码示例
// 使用 cache 的情况
val cachedRDD = largeRDD.map(complexTransformation).cache()
// 第一次触发行动算子,计算并统计时间
val startTime3 = System.currentTimeMillis()
val result3 = cachedRDD.collect()
val endTime3 = System.currentTimeMillis()
println(s"使用 cache 第一次计算耗时: ${endTime3 - startTime3} 毫秒")
// 第二次触发行动算子,计算并统计时间
val startTime4 = System.currentTimeMillis()
val result4 = cachedRDD.collect()
val endTime4 = System.currentTimeMillis()
println(s"使用 cache 第二次计算耗时: ${endTime4 - startTime4} 毫秒")
println(s"spark.local.dir 的值: ${conf.get("spark.local.dir")}")
sc.stop()