自定义分区器-基础
(一)什么是分区
在 Spark 里,弹性分布式数据集(RDD)是核心的数据抽象,它是不可变的、可分区的、里面的元素并行计算的集合。RDD 会被划分成多个分区,每个分区就是一个数据集片段,这些分区可以分布在集群的不同节点上。
分区是 Spark 进行并行计算的基本单位,Spark 会对每个分区的数据并行处理,以此提升计算效率。
(二)默认分区的情况
从不同数据源创建 RDD 的默认分区情况。
- 从集合创建 RDD(使用 parallelize 方法)
当使用 parallelize 方法从一个集合创建 RDD 时,默认分区数通常取决于集群的配置。在本地模式下,默认分区数等于本地机器的 CPU 核心数;在集群模式下,默认分区数由 spark.default.parallelism 配置项决定。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("DefaultPartitionExample").setMaster("local")
val sc = new SparkContext(conf)
val data = Seq(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
println(s"默认分区数: ${rdd.partitions.length}")
sc.stop()
2.从外部存储(如文件)创建 RDD(使用 textFile 方法)
当使用 textFile 方法从外部存储(如 HDFS、本地文件系统等)读取文件创建 RDD 时,默认分区数通常由文件的块大小决定。对于 HDFS 文件,默认分区数等于文件的块数。例如,一个 128MB 的文件在 HDFS 上被分成 2 个 64MB 的块,那么创建的 RDD 默认分区数就是 2。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("DefaultPartitionFileExample").setMaster("local")
val sc = new SparkContext(conf)
// 假设文件存在于本地
val rdd = sc.textFile("path/to/your/file.txt")
println(s"默认分区数: ${rdd.partitions.length}")
sc.stop()
(三)查看 RDD 分区个数的方法
通过访问 partitions 属性的长度来获取,如上述 Scala 示例中的 rdd.partitions.length。
注意事项
你可以通过 spark.default.parallelism 配置项来调整默认分区数。在创建 SparkContext 之前,可以通过 SparkConf 来设置该配置项,例如:
val conf = new SparkConf().setAppName("CustomParallelismExample")
.setMaster("local")
.set("spark.default.parallelism", "4")
val sc = new SparkContext(conf)
val data = Seq(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
println(s"自定义分区数: ${rdd.partitions.length}")
sc.stop()