当前位置: 首页 > ai >正文

Spark RDD行动算子与共享变量实战:从数据聚合到分布式通信

RDD行动算子:
行动算子就是会触发action的算子,触发action的含义就是真正的计算数据。
1、reduce
import org.apache.spark.{SparkConf, SparkContext}
object value11 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 reduce 操作对 RDD 元素求和
    val reduceResult = rdd.reduce(_ + _)
    // 输出结果
    println(reduceResult)

    // 关闭 SparkContext
    sc.stop()
  }
}

2、Foreach
import org.apache.spark.{SparkConf, SparkContext}

object value12 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDForeachExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 对 RDD 元素进行遍历并打印
    rdd.foreach(println)

    // 关闭 SparkContext
    sc.stop()
  }
}

3、count
import org.apache.spark.{SparkConf, SparkContext}

object value13{
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDActionExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 count 算子统计 RDD 中元素个数
    val countResult = rdd.count()
    // 打印统计结果
    println(countResult)

    // 关闭 SparkContext
    sc.stop()
  }
}

4、Take
import org.apache.spark.{SparkConf, SparkContext}

object value14 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDActionTakeExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 take 算子获取 RDD 的前 2 个元素
    val takeResult = rdd.take(2)
    // 遍历并打印获取到的元素
    takeResult.foreach(println)

    // 关闭 SparkContext
    sc.stop()
  }
}

5、first
import org.apache.spark.{SparkConf, SparkContext}

object value15 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDFirstExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 first 算子获取 RDD 中的第一个元素
    val firstResult = rdd.first()
    // 打印获取到的第一个元素
    println(firstResult)

    // 关闭 SparkContext
    sc.stop()
  }
}

6、Aggregate
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClassTag

object 、value16{
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDAggregateExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 且分区数为 8 的 RDD
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)

    // 使用 aggregate 算子聚合 RDD 元素,初始值为 0
    val result1: Int = rdd.aggregate(0)(_ + _, _ + _)
    // 使用 aggregate 算子聚合 RDD 元素,初始值为 10
    val result2: Int = rdd.aggregate(10)(_ + _, _ + _)

    // 打印结果
    println(result1)
    println("**********")
    println(result2)

    // 关闭 SparkContext
    sc.stop()
  }
}

7、save 相关算子
import org.apache.spark.{SparkConf, SparkContext}

object value17 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDSaveExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // 保存成 Text 文件
    rdd.saveAsTextFile("Spark-core/output/output")
    // 序列化成对象保存到文件
    rdd.saveAsObjectFile("Spark-core/output/output1")

    // 关闭 SparkContext
    sc.stop()
  }
}

实现原理
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在
Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
import org.apache.spark.{SparkConf, SparkContext}

object value18 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("AccumulatorExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4,5 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
    // 声明累加器
    val sum = sc.longAccumulator("sum")

    rdd.foreach { num =>
      // 使用累加器
      sum.add(num)
    }

    // 获取累加器的值
    println("sum = " + sum.value)

    // 关闭 SparkContext
    sc.stop()
  }
}
实现原理
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个
或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,
广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务
分别发送。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast

object  value19{
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("BroadcastExample").setMaster("local[*]")
    // 创建SparkContext对象
    val sparkContext = new SparkContext(conf)

    // 创建RDD
    val rdd1 = sparkContext.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 4)
    // 定义列表
    val list = List(("a", 4), ("b", 5), ("c", 6), ("d", 7))
    // 创建广播变量
    val broadcast: Broadcast[List[(String, Int)]] = sparkContext.broadcast(list)

    // 对RDD进行转换
    val resultRDD = rdd1.map { case (key, num) =>
      var num2 = 0
      for ((k, v) <- broadcast.value) {
        if (k == key) {
          num2 = v
        }
      }
      (key, (num, num2))
    }

    // 收集并打印结果
    resultRDD.collect().foreach(println)

    // 关闭SparkContext
    sparkContext.stop()
  }
}

http://www.xdnf.cn/news/2272.html

相关文章:

  • 基于SpringBoot+PostgreSQL+ROS Java库机器人数据可视化管理系统
  • 热红外遥感在火情监测中有什么作用?
  • 深入Java JVM常见问题及解决方案
  • Java位运算符大全
  • 亚组风险比分析与可视化
  • OceanBase单机重启和配置修改
  • 再学GPIO(一)
  • 汽车制造行业如何在数字化转型中抓住机遇?
  • springboot不连接数据库启动(原先连接了mysql数据库)
  • 【Redis】Redis Zset实现原理:跳表+哈希表的精妙设计
  • C++初阶-STL简介
  • 怎样给MP3音频重命名?是时候管理下电脑中的音频文件名了
  • FlinkUpsertKafka深度解析
  • 重温TCP通信过程
  • C++ 类与对象(中)—— 默认成员函数与运算符重载的深度解析:构造函数,析构函数,拷贝构造函数,赋值运算符重载,普通取地址重载,const取地址重载
  • 【项目篇之垃圾回收】仿照RabbitMQ模拟实现消息队列
  • HTTP header Cookie 和 Set-Cookie
  • 系统架构师---基于规则的系统架构
  • FreeBSD可以不经过windows服务器访问windows机器上的共享文件吗?
  • PID程序实现
  • 高速系统设计理论基础
  • (done) 吴恩达版提示词工程 4. 摘要 (生成摘要,指定信息摘要,提取指定信息,多条评论摘要)
  • 什么是智能导诊知识库?
  • Pinia 详细解析:Vue3 的状态管理利器
  • 【油猴脚本 2】bilibili 视频合集标题搜索
  • 软件维护类型四大类型(IEEE 14764 标准)
  • Java基础 4.26
  • Dijkstra‘s Algorithm Implementation
  • Compose笔记(十九)--NestedScroll
  • Pygame核心概念解析:Surface、Clock与事件循环