当前位置: 首页 > ds >正文

Spark学习记录

1、Spark基础介绍

1.1、Spark基础概念

Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎

1.2、Spark运行架构

运行过程:

  1. Driver 执行用户程序(Application)的main()方法并创建 SparkContext,与 Cluster Manager 建立通信

  2. Cluster Manager 为用户程序分配计算资源,返回可供使用的 Executor 列表

  3. 获取 Executor 资源后,Spark 会将用户程序代码及依赖包(Application jar)传递给 Executor

  4. 最后,SparkContext 发送 tasks(经拆解后的任务序列)到 Executor,由其执行计算任务并输出结果

Driver的职责:

  • 作业解析与 DAG 生成:

    • 将用户编写的 Spark 程序(如JavaSparkContext)解析为有向无环图(DAG)。

  • 任务调度:

    • 将 DAG 划分为多个阶段(Stage)和任务(Task)。

    • 为每个任务分配资源并调度到 Executor 上执行。

  • 与集群管理器通信:

    • 向集群管理器Cluster Manager申请资源(Executor)。

    • 监控 Executor 的状态和资源使用情况。

  • 结果收集:

    • 接收 Executor 返回的计算结果。

Executor的职责:

  • 执行计算任务:

    • 接收 Driver 发送的任务(如mapreduce等操作)。

    • 在本地执行具体的计算逻辑。

  • 内存管理:

    • 缓存 RDD 和中间计算结果(如通过persist()cache())。

    • 管理任务执行时的内存分配(如堆内 / 堆外内存)。

  • 与 Driver 通信:

    • 向 Driver 汇报任务状态(如完成、失败)。

    • 返回计算结果给 Driver。

Cluster Manager的职责:

  • 资源分配与调度:

    • 接收 Driver 的资源请求,并在集群节点上启动相应的 Executor 进程

  • 节点监控与故障恢复:

    • 监控集群中各节点的健康状态,在节点故障时进行处理(如重新分配资源、重启 Executor)

1.3、Spark生态圈

1.3.1、数据来源

Spark支持多种数据来源。包括文件系统、数据库、结构化数据、实时数据等。

1.3.2、运行模式

1.3.2.1、Local模式(单机)

Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。

1.3.2.2、Standalone模式(集群)

Standalone模式是Spark自带的资源调度引擎,构建一个由Master + Worker构成的Spark集群,Spark运行在集群中。

运行过程:

  1. SparkContext连接到Master,向Master注册并申请资源(CPU Core 和Memory);

  2. Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,并在该Worker上获取资源,然后启动StandaloneExecutorBackend;

  3. StandaloneExecutorBackend向SparkContext注册;

  4. SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage,然后以Stage提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;

  5. StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。

  6. 所有Task完成后,SparkContext向Master注销,释放资源。

1.3.2.3、Yarn模式(集群)

Spark使用Hadoop的Yarn组件进行资源与任务调度。分为Yarn-client模式和Yarn-cluster模式。

(1)Yarn-client模式(默认)

Yarn-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application和客户端进行交互,因为Driver在客户端,所以可以通过webUI访问Driver的状态,默认是http://{IP}:4040访问。

运行过程:

  1. Client向Yarn的ResourceManager申请启动Application Master。同时在SparkContext初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;

  2. ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;

  3. Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);

  4. 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;

  5. Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;

  6. 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。

(2)Yarn-cluster模式

Yarn-cluster模式中,Driver程序运行在由ResourceManager启动的ApplicationMaster,适用于生产环境。

运行过程:

  1. Client向Yarn中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;

  2. ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;

  3. ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;

  4. 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。

  5. ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;

  6. 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

(3)Yarn-cluster 和 Yarn-client 区别与联系

  1. 从广义上讲,Yarn-cluster适用于生产环境;而Yarn-client适用于交互和调试,也就是希望快速地看到application的输出。

  2. 从深层次的含义讲,Yarn-cluster和Yarn-client模式的区别其实就是Application Master进程的区别,Yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行。然而Yarn-cluster模式不适合运行交互类型的作业。而Yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通信来调度他们工作,也就是说Client不能离开。

1.3.2.4、Mesos模式(集群)

Spark使用Mesos平台进行资源与任务的调度。

1.3.3、Spark核心模块

(1)Spark Core:Spark的基础组件,提供了分布式数据处理的核心功能,包括任务调度、内存管理、错误恢复、与存储系统交互等模块。还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。

(2)Spark SQL:提供了对结构化数据的处理能力,支持SQL查询和Spark DataFrame API。

(3)Spark Streaming:用于处理实时数据流,支持高吞吐量的数据流处理。它可以将实时数据源如Kafka、Flume等集成到Spark应用中,实现实时数据处理和分析。

(4)Spark MLlib:机器学习库,提供了丰富的机器学习算法,包括分类、回归、聚类、协同过滤等。MLlib支持多种编程语言,如Scala、Java、Python和R。

(5)Spark GraghX:用于图数据处理和分析的库,支持图计算、图算法等。GraphX可以高效地处理大规模图数据,适用于社交网络分析、推荐系统等领域。

1.4、Spark与MapReduce

1.4.1、MapReduce框架

MapReduce 是一种分布式计算编程模型,由 Google 在 2004 年提出,用于大规模数据处理。

Hadoop MapReduce 是其开源实现,成为 Hadoop 生态的核心组件之一,用于在集群上并行处理海量数据。

数据处理流程是从数据源获取数据,经过分析计算后,将结果输出到指定位置,核心是一次计算,不适合迭代计算。

1.4.2、Spark框架

基于内存处理,可以将多个计算任务迭代,中间结果可以不落盘(注:Shuffle是落盘的)

1.4.3、Spark与MapReduce的区别

特性

Spark

MapReduce

开发语言

Java+Scala

Java

编程模型

多种API,多语言接口,多种操作方式(DataFrame、Dataset 和 SQL)

Map 和 Reduce

延迟性

适用场景

迭代计算、实时处理

批处理

容错机制

RDD 血统

检查点和重新计算

资源管理

Stanalone、Yarn、Mesos

Hadoop Yarn

数据处理类型

批处理、流处理、交互式查询

批处理

处理方式

在MR的基础上优化了计算过程

出现的较早,只考虑单一的操作

计算模式

内存计算为主

磁盘计算为主

1.5、Spark快速开始

1.5.1、安装包下载

下载地址:Downloads | Apache Spark

1.5.2、上传并解压安装包

tar -zxvf spark-3.3.1-bin-hadoop3.tgz

1.5.3、执行官方示例

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.3.1.jar \
10

参数解释:

bin/spark-submit:提交spark任务

--class org.apache.spark.examples.SparkPi:要执行程序的主类

--master local[2]:

(1)local:没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算

(2)local[K]:指定使用K个Core来运行计算,比如local[2]就是运行2个Core来执行

(3)local[*]:自动帮你按照CPU最多核来设置线程数

./examples/jars/spark-examples_2.12-3.3.1.jar:要运行的程序

10:要运行程序的输入参数

1.5.4、结果截图展示

2、Spark Core模块

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集

  • 累加器:分布式共享只写变量

  • 广播变量:分布式共享只读变量

2.1、RDD

2.1.1、RDD基本概念

RDD叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变的、可分区的、里面的元素可并行计算的集合。

  • 弹性:

    • 自动进行内存和磁盘数据存储的切换

Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换

    • 基于血统的高效容错机制

在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。

    • Task如果失败会自动进行特定次数的重试

RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。

    • Stage如果失败会自动进行特定次数的重试

如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。

    • Checkpoint和Persist可主动或被动触发

RDD可以用Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。

    • 数据调度弹性

Spark把这个Job执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。

    • 数据分片的高度弹性

可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。

  • 不可变性:

    • RDD是不可变的,这意味着一旦创建,其内容就不能被改变。这种设计保证了RDD的只读属性,使得Spark能够在分布式环境中安全地高并发并行处理数据,而不用担心数据在处理过程中的修改。不可变性简化了并行计算的实现,因为它允许Spark优化数据的访问和存储,避免了并发修改带来的复杂性。

  • 分区:

    • RDD是可分区的,这意味着可以将数据集分割成多个分区,每个分区可以被存储在集群中的不同节点上。这种分区机制允许Spark并行处理数据,每个分区可以在不同的节点上同时处理,极大地提高了处理速度和效率。通过合理的分区策略,可以优化数据的本地访问和减少跨节点通信的开销。

  • 并行计算:

    • RDD支持并行计算,这是Spark的核心优势之一。通过对数据进行分区,Spark可以将计算任务分配到多个节点上并行执行。这种并行处理能力使得Spark能够处理大规模数据集,同时保持较高的处理速度和效率。通过使用如mapfilterreduce等操作,可以在不同的分区上独立地执行数据处理逻辑,从而实现高效的并行计算。

2.1.2、RDD创建方式

2.1.2.1、从集合(内存)中创建RDD:parallelize
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> stringRDD = jsc.parallelize(Arrays.asList("hello", "spark"), 2);//2:代表分区数
2.1.2.2、从外部存储(文件)中创建RDD

由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lineRDD = jsc.textFile("input.text");JavaPairRDD<LongWritable, Text> hadoopRDD =sc.hadoopFile("input", TextInputFormat.class, LongWritable.class, Text.class, 2);
2.1.3.3、从其他RDD中创建

主要是通过一个RDD运算完后,再产生新的RDD。

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> stringRDD = jsc.parallelize(Arrays.asList("hello", "spark"), 2);JavaRDD<String> mapRDD = stringRDD.map(s -> s + "123");

2.1.3、分区(Partition)和分区器(Partitioner)

2.1.3.1、分区(Partition)

分区是 RDD(弹性分布式数据集)的基本组成单位,是一个逻辑上的概念,表示数据的分片。每个分区可被单独处理,分布在集群的不同节点上,实现并行计算。

分区数的设置优先级:

1. 优先使用方法参数

jsc.parallelize(Arrays.asList("hello", "spark"), 2);jsc.textFile("input.text", 2);

2. 使用配置参数:spark.default.parallelism

sparkConf.set("spark.default.parallelism", "4");
3. 采用环境默认总核值
sparkConf.setMaster("local[*]");

分区数据的分配规则:

针对集合(内存)中创建的RDD:

val start = ((i * length) / numSlices).toInt

val end = (((i + 1) * length) / numSlices).toInt

i:索引编号,从0开始;length:数据的数组长度;numSlices:分区数

针对外部存储中创建的RDD:

Spark框架文件的操作没有自己的实现的。采用MR库(Hadoop)来实现,当前读取文件的切片数量不是由Spark决定的,而是由Hadoop决定。

注:底层使用函数minPartitions = Math.min(defaultParallelism, 2),以上设置的分区上不一定是最终分区数

2.1.3.2、分区器(Partitioner)

分区器是一种策略,用于决定 RDD 中的键值对(Key-Value)如何被分配到各个分区中。Spark目前支持Hash分区器、Range分区器和用户自定义分区器。Hash分区器为当前的默认分区器。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

注意:

(1)只有Key-Value类型的pairRDD才有分区器,非Key-Value类型的RDD分区的值是None,数据分布由系统自动管理,通常采用轮询(round-robin)方式分配到不同分区

(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

Hash分区器:

对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

Range分区器:

将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。

用户自定义分区器:

  1. 定义分区器类:继承org.apache.spark.Partitioner类,并实现其抽象方法。

  2. 实现numPartitions方法:返回分区数量。

  3. 实现getPartition方法:根据键返回其分区ID。

  4. 在操作中使用自定义分区器:在执行如groupByKeyreduceByKey等操作时,通过设置参数来使用自定义分区器。

2.1.4、RDD算子

将数据转化为RDD之后,就需要进行RDD的计算,RDD提供了计算方法RDD的方法又称为RDD算子。RDD 支持两种类型的算子:转换算子(transformation) 和动作算子( action)。转换算子可以将已有RDD转换得到一个新的RDD,而动作算子则是基于RDD的计算,并将结果返回给驱动器(driver)。

2.1.4.1、转换算子-transformation

用于从现有 RDD 创建新的 RDD,原有的RDD保持不变,不会触发实际计算。转换算子是惰性的(Lazy),仅记录计算逻辑(DAG),不立即执行。

针对Value类型(部分方法):

方法

解释

示例

map()映射

参数f是一个函数,可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。

JavaRDD<String> lineRDD = sc.textFile("input/1.txt");

JavaRDD<String> mapRDD = lineRDD.map(s -> s + "||");

flatMap()扁平化

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

List<List<Integer>> datas = Arrays.asList(

Arrays.asList(1, 2),

Arrays.asList(3, 4)

);



JavaRDD<List<Integer>> rdd = jsc.parallelize(datas, 2);

JavaRDD<Integer> flatMapRDD = rdd.flatMap(new FlatMapFunction<List<Integer>, Integer>() {

@Override

public Iterator<Integer> call(List<Integer> list) {

return list.iterator();

}

});

mapPartitions()按分区批量处理

对每个分区的数据批量应用函数,函数输入为整个分区的迭代器。

JavaRDD<Integer> integerJavaRDD = parallelize.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {

@Override

public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {

List<Integer> list = new ArrayList<>();

while (integerIterator.hasNext()) {

Integer next = integerIterator.next();

list.add(next * 2);

}

return list.iterator();

}

});

groupBy()分组

按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

groupBy会存在shuffle过程

shuffle:将不同的分区数据进行打乱重组的过程

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = integerJavaRDD.groupBy((Function<Integer, Integer>) v1 -> v1 % 2);

filter()过滤

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

JavaRDD<Integer> filterRDD = integerJavaRDD.filter((Function<Integer, Boolean>) v1 -> v1 % 2 == 0);

distinct()去重

对内部的元素去重,并将去重后的元素放到新的RDD中。

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);

JavaRDD<Integer> distinct = integerJavaRDD.distinct();

sortBy()排序

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(5, 8, 1, 11, 20), 2);

JavaRDD<Integer> sortByRDD = integerJavaRDD.sortBy((Function<Integer, Integer>) v1 -> v1, true, 2);

针对Key-Value类型(部分方法):要想使用Key-Value类型的算子首先需要使用mapToPair方法转换为PairRDD

方法

解释

示例

mapValues()

针对于(K,V)形式的类型只对V进行操作

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

JavaPairRDD<String, String> pairRDD = integerJavaRDD.mapToPair((PairFunction<Integer, String, String>) integer -> new Tuple2<>(integer.toString(), integer.toString()));

JavaPairRDD<String, String> mapValuesRDD = pairRDD.mapValues((Function<String, String>) v1 -> v1 + "|||");

groupByKey()

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。该操作可以指定分区器或者分区数(默认使用HashPartitioner)

JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);

JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));

JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();

reduceByKey()

该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。与groupByKey()的区别在于在shuffle之前有combine(预聚合)操作,在不影响业务逻辑的前提下,优先选用reduceByKey()

JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);

JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));

JavaPairRDD<String, Integer> result = pairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);

sortByKey()

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

JavaPairRDD<Integer, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(4, "a"), new Tuple2<>(3, "c"), new Tuple2<>(2, "d")));

JavaPairRDD<Integer, String> pairRDD = javaPairRDD.sortByKey(false);

2.1.4.2、动作算子-action

行动算子是触发了整个作业的执行。

方法

解释

示例

collect()

以数组Array的形式返回数据集的所有元素

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

List<Integer> collect = integerJavaRDD.collect();

count()

返回RDD中元素的个数

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

long count = integerJavaRDD.count();

first()

返回RDD中的第一个元素

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

Integer first = integerJavaRDD.first();

take()

返回一个由RDD的前n个元素组成的数组

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

List<Integer> list = integerJavaRDD.take(3);

countByKey()

统计每种key的个数

JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 8), new Tuple2<>("b", 8), new Tuple2<>("a", 8), new Tuple2<>("d", 8)));

Map<String, Long> map = pairRDD.countByKey();

saveAsTextFile()

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本,每条记录占一行,适用于存储普通文本或 JSON 等可解析的字符串数据。

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

integerJavaRDD.saveAsTextFile("output");

saveAsObjectFile()

将RDD中的元素序列化成对象,存储到文件中(二进制格式保存),适用于存储复杂对象(如自定义类或复杂结构数据),但需配合 Spark 提供的序列化框架(如 Kryo 序列化)使用。

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);

integerJavaRDD.saveAsObjectFile("output1");

foreach()

遍历RDD中每一个元素

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),4);

integerJavaRDD.foreach((VoidFunction<Integer>) System.out::println);

foreachPartition()

遍历RDD中每一个分区

JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);

parallelize.foreachPartition((VoidFunction<Iterator<Integer>>) integerIterator -> {

// 一次处理一个分区的数据

while (integerIterator.hasNext()) {

Integer next = integerIterator.next();

System.out.println(next);

}

});

2.1.4.3、混洗算子-shuffle

混洗算子(shuffle operator)指的是那些导致Spark重新分配数据以达到重新组织数据的目的的操作。Shuffle操作通常发生在需要基于键(key)对数据进行重新分组或聚合时,比如在执行groupByKeyreduceByKeyaggregateByKeyjoin(特别是大key的join)等操作时。

示例:在执行groupByKey时,Spark需要将具有相同键的数据聚合到同一个节点上,这就需要将数据从一个节点移动到包含所有具有该键数据的节点上。

优化混洗操作的策略

  • 减少混洗的数据量:尽量减少需要混洗的数据量。例如,通过在map阶段进行过滤,减少后续混洗操作的数据量。

  • 使用合适的分区策略:合理设置分区数可以减少混洗的开销。例如,在执行groupByKey之前使用repartition(重新分配数据,会触发shuffle)coalesce(合并分区,不会触发shuffle)来调整数据的分区数。

  • 避免宽依赖:尽量避免宽依赖(wide dependency),即尽量避免在混洗操作中使用大宽度的键。例如,使用mapToPairreduceByKey组合代替groupByKey,因为reduceByKey通常有更好的性能。

  • 广播变量:对于小数据集的join操作,使用广播变量可以减少混洗的需求。

  • 使用缓存:对频繁使用的数据进行缓存可以减少混洗操作的重复执行。

2.1.5、RDD依赖关系

2.1.5.1、RDD血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。通过.toDebugString()方法查看RDD血缘关系。

(8) ParallelCollectionRDD[0] at parallelize at SparkLineage.java:30 []**************mapToPair依赖parallelize****************(8) MapPartitionsRDD[1] at mapToPair at SparkLineage.java:33 []| ParallelCollectionRDD[0] at parallelize at SparkLineage.java:30 []**************reduceByKey依赖mapToPair****************(8) ShuffledRDD[2] at reduceByKey at SparkLineage.java:36 []+-(8) MapPartitionsRDD[1] at mapToPair at SparkLineage.java:33 []| ParallelCollectionRDD[0] at parallelize at SparkLineage.java:30 []
2.1.5.2、宽依赖

宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle。宽依赖的算子包括sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作,在不影响业务的情况下,应避免使用。

2.1.5.3、窄依赖

窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一)。

2.1.5.4、DAG有向无环图

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

2.1.5.5、RDD任务划分
  1. Application:初始化一个SparkContext即生成一个Application;

  2. Job:一个Action算子就会生成一个Job;

  3. Stage:Stage等于宽依赖的个数加1;

  4. Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

2.1.6、序列化

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

示例:

在以下示例中,若User未实现implements Serializable接口,会报错SparkException: Task not serializable和java.io.NotSerializableException: com.example.spark.User


User zhangsan = new User("zhangsan", 13);User lisi = new User("lisi", 13);JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(zhangsan, lisi), 2);JavaRDD<User> mapRDD = userJavaRDD.map((Function<User, User>) v1 -> new User(v1.getName(), v1.getAge() + 1));mapRDD. collect().forEach(System.out::println);

序列化方式:Java序列化(默认)、Kryo序列化(推荐)

Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

示例:

引入依赖(Spark已内置)


<dependency><groupId>com.esotericsoftware</groupId><artifactId>kryo</artifactId><version>5.6.1</version></dependency>

代码中修改序列化机制和注册要序列化的类

SparkConf conf = new SparkConf();conf.setMaster("local[*]");conf.setAppName("sparkCore");// 替换默认的序列化机制conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");// 注册需要使用kryo序列化的自定义类conf.registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});

2.1.7、RDD持久化

2.1.7.1、RDD Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。注意:cache操作会增加血缘关系,不改变原有的血缘关系

默认存储级别是 StorageLevel.MEMORY_ONLY(以Java序列化方式存到内存里)。完整的存储级别列表如下:

存储级别

含义

MEMORY_ONLY

以未序列化的 Java 对象形式将 RDD 存储在 JVM 内存中。如果RDD不能全部装进内存,那么将一部分分区缓存,而另一部分分区将每次用到时重新计算。这个是Spark的RDD的默认存储级别。

MEMORY_AND_DISK

以未序列化的Java对象形式存储RDD在JVM中。如果RDD不能全部装进内存,则将不能装进内存的分区放到磁盘上,然后每次用到的时候从磁盘上读取。

MEMORY_ONLY_SER

以序列化形式存储 RDD(每个分区一个字节数组)。通常这种方式比未序列化存储方式要更省空间,尤其是如果你选用了一个比较好的序列化协议(fast serializer),但是这种方式也相应的会消耗更多的CPU来读取数据。

MEMORY_AND_DISK_SER

和 MEMORY_ONLY_SER 类似,只是当内存装不下的时候,会将分区的数据吐到磁盘上,而不是每次用到都重新计算。

DISK_ONLY

RDD 数据只存储于磁盘上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2等

和上面没有”_2″的级别相对应,只不过每个分区数据会在两个节点上保存两份副本。

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

设置缓存示例:

mapRDD.cache();mapRDD.persist(StorageLevel.MEMORY_ONLY());
2.1.7.2、RDD CheckPoint检查点

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。检查点的目的是将RDD中间结果写入磁盘。检查点存储路径通常是存储在HDFS等容错、高可用的文件系统。检查点数据存储格式为:二进制的文件。检查点触发时间和Cache缓存一样,不会马上被执行,必须执行Action操作才能触发。检查点会切断血缘关系。而且检查点为了数据安全,会从血缘关系的最开始执行一遍。推荐做法是先进行Cache缓存操作,再进行CheckPoint检查点操作。

sparkContext.setCheckpointDir("hdfs://hadoop102:9000/spark/checkpoint");//设置检查点路径mapRDD.cache();//执行cache方法,缓存数据mapRDD.checkpoint();//设置检查点
2.1.7.3、缓存和检查点的的区别
  • Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

  • Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

  • 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

  • 如果使用完了缓存,可以通过unpersist()方法释放缓存。

2.2、累加器

累加器:分布式共享只写变量

累加器支持在所有节点间进行累加操作(如计数或求和),但仅允许驱动程序(Driver Program)读取最终结果。

2.2.1、系统累加器

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。

代码示例:

LongAccumulator longAccumulator = sc.sc().longAccumulator();// LongAccumulator: 数值型累加DoubleAccumulator doubleAccumulator = sc.sc().doubleAccumulator();// DoubleAccumulator: 小数型累加CollectionAccumulator<Integer> collectionAccumulator = sc.sc().collectionAccumulator();// CollectionAccumulator:集合累加List<Integer> integers = parallelize.map((Function<Integer, Integer>) v1 -> {longAccumulator.add(v1);doubleAccumulator.add(v1);collectionAccumulator.add(v1);return v1;}).collect();System.out.println(collectionAccumulator.value());//driver端获取累加值System.out.println(longAccumulator.value());//driver端获取累加值System.out.println(doubleAccumulator.value());//driver端获取累加值

2.2.2、自定义累加器

当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。

代码示例:

1. 继承AccumulatorV2,实现相关方法

class BigIntegerAccumulator extends AccumulatorV2<BigInteger, BigInteger> {private BigInteger num = BigInteger.ZERO;public BigIntegerAccumulator() {}public BigIntegerAccumulator(BigInteger num) {this.num = new BigInteger(num.toString());}//检查当前值是否为零@Overridepublic boolean isZero() {return num.compareTo(BigInteger.ZERO) == 0;}//创建当前累加器的副本@Overridepublic AccumulatorV2<BigInteger, BigInteger> copy() {return new BigIntegerAccumulator(num);}//重置累加器的值(通常用于重新开始累加)@Overridepublic void reset() {num = BigInteger.ZERO;}//向累加器添加数值@Overridepublic void add(BigInteger num) {this.num = this.num.add(num);}//将当前值与本地值相加,并更新本地累加器的值@Overridepublic void merge(AccumulatorV2<BigInteger, BigInteger> other) {num = num.add(other.value());}//返回当前累加器的值,仅在Driver端可用@Overridepublic BigInteger value() {return num;}}

2. 创建自定义Accumulator的实例,然后在SparkContext上注册它

// 直接new自定义的累加器BigIntegerAccumulator bigIntegerAccumulator = new BigIntegerAccumulator();// 然后在SparkContext上注册一下sparkContext.register(bigIntegerAccumulator, "bigIntegerAccumulator");

2.3、广播变量

广播变量:分布式共享只读变量。

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。

代码示例:

final JavaRDD<String> rdd = sparkContext.parallelize(Arrays.asList("Hello", "Spark", "Hadoop", "Flink", "Spark", "Hadoop"));List<String> okList = Arrays.asList("Spark", "Hadoop");//假设一个大对象final Broadcast<List<String>> broadcast = sparkContext.broadcast(okList);//构建一个广播变量final JavaRDD<String> filterRDD = rdd.filter(s -> broadcast.value().contains(s)//broadcast.value()拉取广播变量的值);

3、Spark SQL模块

3.1、概述

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种,包括SQL和Dataset API。计算结果时,使用相同的执行引擎,与您用于表达计算的API/语言无关。

3.2、Spark SQL特点

  • 易整合

无缝的整合了SQL查询和Spark编程。

Dataset<Row> ds = sparkSession.read().json("data/user.json");//将数据模型转换成表,方便SQL的使用ds.createOrReplaceTempView("user");//使用SQL文的方式操作数据String sql = "select avg(age) from user";Dataset<Row> sqlDS = sparkSession.sql(sql);//展示数据模型的效果sqlDS.show();
  • 统一的数据访问方式

使用相同的方式连接不同的数据源。

Dataset<Row> ds = sparkSession.read().json("data/user.json");//将数据模型转换成表,方便SQL的使用ds.createOrReplaceTempView("user");
  • 兼容Hive

在已有的仓库上直接运行SQL或者HQL。

SparkSession sparkSession = SparkSession.builder().enableHiveSupport() //启用Hive的支持.master("local[*]").appName("SparkSQL").getOrCreate();sparkSession.sql("show tables").show();sparkSession.sql("create table user_info(name String,age bigint)");
  • 标准的数据连接

通过JDBC或者ODBC来连接。

Dataset<Row> jdbc = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/gmall", "activity_info", properties);//连接mysql数据库

3.3、Spark SQL发展历程

RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的是他们的执行效率和执行方式。在现在的版本中,Dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSet<Row>。

RDD、DataFrame、DataSet三者的共性:

  • 三者都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利。

  • 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算。

  • 三者有许多共同的函数,如filter,排序等。

  • 三者都会根据Spark的内存情况自动缓存运算。

  • 三者都有分区的概念。

3.4、Spark SQL编程

3.3.1、SparkSession

在老的版本中,SparkSQL提供两种SQL查询起始点:

  • 一个叫SQLContext,用于Spark自己提供的SQL查询;

  • 一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

//第一种方式SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");SparkContext sc = new SparkContext(conf);SparkSession sparkSession = new SparkSession(sc);//第二种方式SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").enableHiveSupport() // 关键:开启Hive功能.getOrCreate();//第三种方式SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

3.3.3、Spark SQL入门

//构建环境对象SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate();//对接文件数据源时,会将文件中的一行数据封装为Row对象Dataset<Row> ds = sparkSession.read().json("data/user.json");//可以转换成RDD对象JavaRDD<Row> javaRDD = ds.javaRDD();//将数据模型转换成临时视图,方便SQL的使用ds.createOrReplaceTempView("user");//使用SQL文的方式操作数据String sql = "select * from user";//执行sql查询Dataset<Row> sqlDS = sparkSession.sql(sql);//展示数据模型的效果sqlDS.show();//DSL语法ds.select("*").show();//DSL语法ds.select("name","age").where(ds.col("name").equalTo("张三")).show();//释放资源sparkSession.close();

3.3.4、数据的加载与保存

3.3.4.1、CSV文件
Dataset<Row> csv = sparkSession.read().option("header", "true") //配置是否第一行为表头(列名).option("sep","_") //配置分隔符,默认是英文的逗号(,).csv("data/user.csv");csv.write().mode(SaveMode.Append)//追加模式,不影响原有文件。Overwrite:覆盖原有文件;Ignore:有文件则不操作;ErrorIfExists:存在文件就报错.option("header", "true") //配置第一行是否是表头.csv("output");
3.3.4.2、JSON文件
Dataset<Row> json = sparkSession.read().json("data/user.json");json.write().json("output");
3.3.4.3、Parquet文件(列式存储文件)
Dataset<Row> csv = sparkSession.read().json("data/user.json");csv.write().parquet("output");
3.3.4.4、对接MySQL
Properties properties = new Properties();properties.setProperty("user","root");properties.setProperty("password","000000");Dataset<Row> jdbc = sparkSession.read().jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true", "activity_info", properties);jdbc.show();

3.3.6、用户自定义函数

3.3.6.1、UDF

用于处理单行数据并返回单个值。其输入和输出为1:1关系,即输入一行数据后输出单个结果。

sparkSession.udf().register("prefixName", new UDF1<String, String>() {@Overridepublic String call(String name) throws Exception {return "Name:" + name;}}, DataTypes.StringType);String sql = "select prefixName(name) from user";Dataset<Row> sqlDS = sparkSession.sql(sql);
3.3.6.2、UDAF

用于处理多行数据并返回单个聚合结果,通常用于统计、分组汇总等场景。

第一步:自定义UDAF函数

//自定义缓冲区的数据类型@Data@AllArgsConstructorpublic class AvgAgeBuffer implements Serializable {private Long total;private Long cnt;}//1. 创建自定义的【公共】类//2. 继承 org.apache.spark.sql.expressions.Aggregator//3. 设定泛型// IN : 输入数据类型// BUFF : 缓冲区的数据类型// OUT : 输出数据类型//4. 重写方法public class MyAvgAgeUDAF extends Aggregator<Long, AvgAgeBuffer, Long> {@Override// TODO 缓冲区的初始化操作public AvgAgeBuffer zero() {return new AvgAgeBuffer(0L, 0L);}@Override// TODO 将输入的年龄和缓冲区的数据进行聚合操作public AvgAgeBuffer reduce(AvgAgeBuffer buffer, Long in) {buffer.setTotal(buffer.getTotal() + in);buffer.setCnt(buffer.getCnt() + 1);return buffer;}@Override// TODO 合并缓冲区的数据public AvgAgeBuffer merge(AvgAgeBuffer b1, AvgAgeBuffer b2) {b1.setTotal(b1.getTotal() + b2.getTotal());b1.setCnt(b1.getCnt() + b2.getCnt());return b1;}@Override// TODO 计算最终结果public Long finish(AvgAgeBuffer buffer) {return buffer.getTotal() / buffer.getCnt();}@Overridepublic Encoder<AvgAgeBuffer> bufferEncoder() {return Encoders.bean(AvgAgeBuffer.class);}@Overridepublic Encoder<Long> outputEncoder() {return Encoders.LONG();}}

第二步:使用

sparkSession.udf().register("avgAge", udaf(new MyAvgAgeUDAF(), Encoders.LONG()));String sql = "select avgAge(age) from user";Dataset<Row> sqlDS = sparkSession.sql(sql);

http://www.xdnf.cn/news/19068.html

相关文章:

  • Unity 客户端和服务器端 基于网络的账户管理系统
  • 除自身以外数组的乘积是什么意思
  • 【OpenGL】LearnOpenGL学习笔记16 - 帧缓冲(FBO)、渲染缓冲(RBO)
  • 【JUC】——线程池
  • 点评项目(Redis中间件)第一部分Redis基础
  • docker run 后报错/bin/bash: /bin/bash: cannot execute binary file总结
  • 边缘计算:一场由物理定律发起的“计算革命”
  • 预测模型及超参数:2.传统机器学习:PLS及其改进
  • HarmonyOS 高效数据存储全攻略:从本地优化到分布式实战
  • 从 GRIT 到 WebUI:Chromium 内置资源加载与前端展示的完整链路解析
  • AI Agent 发展趋势与架构演进
  • 稳敏双态融合架构--架构师的练就
  • 【MES】工业4.0智能制造数字化工厂(数字车间、MES、ERP)解决方案:智能工厂体系架构、系统集成以及智能设计、生产、管理、仓储物流等
  • uvloop深度实践:从原理到高性能异步应用实战
  • http请求能支持多大内容的请求
  • 通义万相音频驱动视频模型Wan2.2-S2V重磅开源
  • 安卓接入通义千问AI的实现记录
  • 欧盟《人工智能法案》生效一年主要实施进展概览(二)
  • React 组件命名规范:为什么必须大写首字母蛊傲
  • 【Datawhale之Happy-LLM】Encoder-only模型篇 task05精华~
  • 计算神经科学数学建模编程深度前沿方向研究(下)
  • 医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(一)
  • 卷积神经网络CNN
  • Xposed框架实战指南:从原理到你的第一个模块
  • 面试之JVM
  • Java并发编程深度解析:从互斥锁到StampedLock的高性能锁优化之路
  • 计算机视觉:从 “看见” 到 “理解”,解锁机器感知世界的密码
  • 嵌入式(day34) http协议
  • 快速了解卷积神经网络
  • 【软考论文】论DevOps及其应用