常见的会触发 Shuffle 的操作和方法
推荐阅读:
用统计零花钱的例子解释:Shuffle 是啥?-CSDN博客
用学生成绩统计的例子理解 Shuffle 的底层逻辑(Spark)_shuffle 逻辑-CSDN博客
1. 显式触发 Shuffle 的操作
操作 | 说明 | 示例 |
---|---|---|
groupByKey() | 按 Key 分组,收集所有值到同一分区 | rdd.groupByKey() |
reduceByKey() | 按 Key 聚合(虽然会局部聚合,但仍需跨分区汇总) | rdd.reduceByKey(_ + _) |
join() | 关联两个 RDD/DataFrame(若未预先分区对齐) | rdd1.join(rdd2) |
distinct() | 去重(内部通过 groupBy 实现) | rdd.distinct() |
repartition() | 强制重新分区(如增加分区数) | rdd.repartition(100) |
sortBy() / sortByKey() | 全局排序(需将所有数据按 Key 重新分区) | rdd.sortByKey() |
cogroup() | 多数据集按 Key 联合分组 | rdd1.cogroup(rdd2) |
intersection() | 取两个 RDD 的交集(需 Shuffle 去重) | rdd1.intersection(rdd2) |
aggregateByKey() | 按 Key 聚合(需跨分区汇总) | rdd.aggregateByKey(0)(_ + _, _ + _) |
2. 隐式触发 Shuffle 的场景
(1) 宽依赖(Wide Dependency)
-
定义:父 RDD 的一个分区数据被子 RDD 的多个分区依赖(如
groupByKey
、join
)。 -
示例:
val rdd = sc.parallelize(1 to 100) val grouped = rdd.groupBy(x => x % 10) // 按余数分组,触发 Shuffle
(2) 数据倾斜(Data Skew)
-
现象:某些分区的数据量远大于其他分区,导致 Shuffle 时部分节点负载过高。
-
示例:
val skewedRdd = rdd.filter(x => x % 1000 == 0) // 假设过滤后某些 Key 数据极多
3. 为什么这些操作需要 Shuffle?
以 reduceByKey
为例:
-
局部聚合:每个分区内先对相同 Key 的值进行预聚合(如求和)。
-
跨分区汇总:将不同分区中相同 Key 的聚合结果传输到同一个分区,最终合并。
-
例如,Key 为 A 的数据分布在分区1和分区2,需将两个分区的 A 数据合并。
-
4. 如何避免或优化 Shuffle?
优化方法 | 说明 | 示例 |
---|---|---|
提前局部聚合 | 用 reduceByKey 替代 groupByKey | rdd.reduceByKey(_ + _) |
调整分区策略 | 用 repartition 或 coalesce 控制分区数 | rdd.coalesce(10) |
广播小数据集 | 用 broadcast 代替 join (小表广播到大表) | broadcast(smallRdd) |
预分区(Partitioning) | 对频繁使用的 RDD 提前分区(如 partitionBy ) | rdd.partitionBy(new HashPartitioner(100)) |
避免数据倾斜 | 对倾斜 Key 加盐(Salt)或拆分处理 | rdd.map(key => (key + "_salt", value)) |
一句话总结
Shuffle 操作 = 跨分区重新洗牌数据,常见于分组、排序、关联等场景。优化核心是 减少数据传输量 和 避免不必要的数据移动。