当前位置: 首页 > news >正文

Spark mapGroups 函数详解与多种用法示例

mapGroups 是 Spark 中一个强大的分组操作函数,它允许你对每个分组应用自定义逻辑并返回一个结果。以下是多个使用简单样例数据的具体用法示例。

基础示例数据

假设我们有一个简单的学生成绩数据集:

// 创建示例DataFrame
val studentScores = Seq(("Math", "Alice", 85),("Math", "Bob", 92),("Math", "Charlie", 78),("Science", "Alice", 88),("Science", "Bob", 95),("Science", "Charlie", 82),("English", "Alice", 90),("English", "Bob", 87),("English", "Charlie", 91)
).toDF("subject", "name", "score")// 按科目分组
val grouped = studentScores.groupByKey(row => row.getAs[String]("subject"))

示例 1: 计算每科平均分

val subjectAverages = grouped.mapGroups { (subject, iterator) =>var total = 0var count = 0while (iterator.hasNext) {val row = iterator.next()total += row.getAs[Int]("score")count += 1}(subject, if (count > 0) total.toDouble / count else 0.0)
}.toDF("subject", "average_score")subjectAverages.show()

输出结果:

+--------+------------------+
| subject|     average_score|
+--------+------------------+
|   Math|              85.0|
|Science|              88.33|
|English|              89.33|
+--------+------------------+

示例 2: 找出每科最高分和学生

val topScores = grouped.mapGroups { (subject, iterator) =>var maxScore = Int.MinValuevar topStudent = ""while (iterator.hasNext) {val row = iterator.next()val score = row.getAs[Int]("score")val name = row.getAs[String]("name")if (score > maxScore) {maxScore = scoretopStudent = name}}(subject, topStudent, maxScore)
}.toDF("subject", "top_student", "top_score")topScores.show()

输出结果:

+--------+-----------+---------+
| subject|top_student|top_score|
+--------+-----------+---------+
|   Math|        Bob|       92|
|Science|        Bob|       95|
|English|    Charlie|       91|
+--------+-----------+---------+

示例 3: 计算每科成绩分布(统计各分数段人数)

val scoreDistribution = grouped.mapGroups { (subject, iterator) =>var excellent = 0  // 90-100var good = 0       // 80-89var average = 0    // 70-79var below = 0      // <70while (iterator.hasNext) {val row = iterator.next()val score = row.getAs[Int]("score")if (score >= 90) excellent += 1else if (score >= 80) good += 1else if (score >= 70) average += 1else below += 1}(subject, excellent, good, average, below)
}.toDF("subject", "excellent", "good", "average", "below_70")scoreDistribution.show()

输出结果:

+--------+---------+----+-------+-------+
| subject|excellent|good|average|below_70|
+--------+---------+----+-------+-------+
|   Math|        1|   1|      1|      0|
|Science|        1|   2|      0|      0|
|English|        1|   2|      0|      0|
+--------+---------+----+-------+-------+

示例 4: 为每科生成成绩报告

val subjectReports = grouped.mapGroups { (subject, iterator) =>var students = List[String]()var scores = List[Int]()var total = 0var count = 0while (iterator.hasNext) {val row = iterator.next()val name = row.getAs[String]("name")val score = row.getAs[Int]("score")students = name :: studentsscores = score :: scorestotal += scorecount += 1}val average = if (count > 0) total.toDouble / count else 0.0val maxScore = if (scores.nonEmpty) scores.max else 0val minScore = if (scores.nonEmpty) scores.min else 0s"Subject: $subject | Students: ${students.mkString(", ")} | " +s"Average: $average | Max: $maxScore | Min: $minScore"
}.toDF("report")subjectReports.show(false)

输出结果:

+---------------------------------------------------------------------------+
|report                                                                     |
+---------------------------------------------------------------------------+
|Subject: Math | Students: Charlie, Bob, Alice | Average: 85.0 | Max: 92 | Min: 78|
|Subject: Science | Students: Charlie, Bob, Alice | Average: 88.33 | Max: 95 | Min: 82|
|Subject: English | Students: Charlie, Bob, Alice | Average: 89.33 | Max: 91 | Min: 87|
+---------------------------------------------------------------------------+

示例 5: 计算每科成绩的标准差

val subjectStdDev = grouped.mapGroups { (subject, iterator) =>var scores = List[Double]()var sum = 0.0var count = 0// 第一次遍历:计算平均值while (iterator.hasNext) {val row = iterator.next()val score = row.getAs[Int]("score").toDoublescores = score :: scoressum += scorecount += 1}if (count == 0) {(subject, 0.0)} else {val mean = sum / count// 第二次遍历:计算方差var variance = 0.0scores.foreach(score => {variance += Math.pow(score - mean, 2)})variance /= count// 计算标准差val stdDev = Math.sqrt(variance)(subject, stdDev)}
}.toDF("subject", "std_dev")subjectStdDev.show()

输出结果:

+--------+------------------+
| subject|           std_dev|
+--------+------------------+
|   Math| 5.88784057761515|
|Science|5.507570547286102|
|English|1.699673171197595|
+--------+------------------+

示例 6: 为每个科目创建自定义摘要

val customSummaries = grouped.mapGroups { (subject, iterator) =>// 收集所有数据val data = iterator.toList.map(row => (row.getAs[String]("name"), row.getAs[Int]("score")))// 排序val sorted = data.sortBy(-_._2)// 计算统计量val scores = sorted.map(_._2)val average = scores.sum.toDouble / scores.lengthval median = if (scores.length % 2 == 1) {scores(scores.length / 2)} else {(scores(scores.length / 2 - 1) + scores(scores.length / 2)) / 2.0}// 创建自定义摘要val summary = Map("subject" -> subject,"top_student" -> sorted.head._1,"top_score" -> sorted.head._2,"average" -> average,"median" -> median,"student_count" -> scores.length)summary
}.toDF("summary")customSummaries.show(false)

输出结果:

+-----------------------------------------------------------------------------------------+
|summary                                                                                  |
+-----------------------------------------------------------------------------------------+
|Map(subject -> Math, top_student -> Bob, top_score -> 92, average -> 85.0, median -> 85.0, student_count -> 3)|
|Map(subject -> Science, top_student -> Bob, top_score -> 95, average -> 88.33, median -> 88.0, student_count -> 3)|
|Map(subject -> English, top_student -> Charlie, top_score -> 91, average -> 89.33, median -> 90.0, student_count -> 3)|
+-----------------------------------------------------------------------------------------+

注意事项

  1. 内存使用mapGroups 会将整个分组的数据加载到内存中,因此对于非常大的分组,可能会导致内存不足的问题。

  2. 性能考虑:对于简单的聚合操作(如求和、计数),使用 Spark 内置的聚合函数通常比 mapGroups 更高效。

  3. 数据倾斜:如果某些分组特别大,可能会导致任务执行时间过长。

  4. 迭代器使用mapGroups 提供的迭代器只能遍历一次,如果需要多次访问数据,需要先将其转换为列表或数组。

  5. 类型安全:使用 mapGroups 时,返回的数据类型需要与预期的输出类型匹配,否则可能会在运行时出现错误。

mapGroups 是一个非常灵活的函数,适用于需要自定义复杂分组逻辑的场景。通过上述示例,你可以看到它可以用于各种统计计算、数据转换和报告生成任务。

http://www.xdnf.cn/news/1398295.html

相关文章:

  • Java面试-MyBatis篇
  • 执行一条Select语句流程
  • python pyqt5开发DoIP上位机【诊断回复的函数都是怎么调用的?】
  • Jedis、Lettuce、Redisson 技术选型对比
  • 【前端教程】HTML 基础界面开发
  • Dify工作流之合同信息提取
  • 【74LS112JK触发器三进制】2022-10-8
  • 常量指针与指针常量习题(一)
  • 每日算法题【二叉树】:二叉树的最大深度、翻转二叉树、平衡二叉树
  • GROMACS 安装:详细教程来袭
  • 上层协议依赖TCP
  • 【系列10】端侧AI:构建与部署高效的本地化AI模型 第9章:移动端部署实战 - iOS
  • pdf转ofd之移花接木
  • 面试 八股文 经典题目 - Mysql部分(一)
  • jsqlparser(六):TablesNamesFinder 深度解析与 SQL 格式化实现
  • Java中使用正则表达式的正确打开方式
  • 在Kotlin中安全的管理资源
  • ⸢ 叁 ⸥ ⤳ 默认安全:概述与建设思路
  • Vue2之axios在脚手架中的使用以及前后端交互
  • MongoDB 聚合管道(Aggregation)高级用法:数据统计与分析
  • destoon8.0根据模块生成html地图
  • Go 语言面试指南:常见问题及答案解析
  • Excel工作技巧
  • 【自然语言处理与大模型】多机多卡分布式微调训练的有哪些方式
  • 【Python】并发编程(一)
  • 网络工程师软考选择题精讲与解题技巧
  • Ubuntu系统下交叉编译Android的X264库
  • 【Qt开发】按钮类控件(一)-> QPushButton
  • 互联网大厂面试:大模型应用开发岗位核心技术点解析
  • LeetCode54螺旋矩阵算法详解