当前位置: 首页 > backend >正文

Spark处理过程--案例数据清洗

(一)需求说明
假设你有一个包含用户信息的文本文件,每行格式为 姓名,年龄,性别,需要清洗掉年龄为空或者非数字的行。

以下是 10 条符合上述示例中数据格式(姓名,年龄,性别)的测试数据,包含了一些可能需要清洗掉的无效数据,你可以将其保存为一个文本文件,用于测试上面的数据清洗程序。

【教师展示要清洗的数据,请同学们观察,并回答应该要清洗掉哪些数据?】

张三,25,男
李四,,女
王五,30,男
赵六,a,女
孙七,35,男
周八,40,女
吴九,abc,男
郑十,45,女
王十,50,男
李二,55,女
这里面:“李四” 的年龄为空,“赵六” 和 “吴九” 的年龄不是有效的数字,在执行数据清洗程序时,这些行应该会被过滤掉。

(二)思路分析
好了,问题描述清楚了,下面我们来看看实现的思路。

【老师提问,并点名找人回答】:实现这个问题的思路是什么?

【总结思路,并现场写下来】

读入文件
对每一行数据进行分析
字段拆分,拆分出年龄这个字段
判断
如果它不是数字或者缺失,则忽略这条数据
否则保存
(三)难点突破
好了,思路有了,现在我们看看里边涉及到的具体功能要怎么实现。

【老师列出功能点,并请同学回答,如何实现如下功能】:

如何读入txt文件?
如何拆分出一行中的年龄?
如何使用过滤算子(filter)?
如何判断是否是整数?
【老师总结要点】

读入txt文件。 val lines = sc.textFile(inputFile)
对拆分出一行中的年龄。val fields = line.split(",")  fields(0)
过滤算子中,函数返回为false,就会被过滤掉,函数返回为true,就会被保留下来。
使用正则表达式。/\d/
经过了前面的分析,下面我们来开始写代码来实现功能。

(四)功能实现
【老师边讲解,边演示】

创建新的maven项目。
创建input文件夹,在input下新建记事本文件,内容就是前面的实例数据。
在src下创建新的scala文件,开始写功能代码。
【老师一边写整体的注释,一边引导同学回答】

(五)参考代码
import org.apache.spark.{SparkConf, SparkContext}
 
object DataFilter {
 
  //思路
  //1.读取数据,读入文本文件
  //2.对于文件中的每一行
  //拆分出年龄,判断是不是数字,是就保留记录“45”,“abc”(正则表达式\d+)
  //3.把过滤之后的内容,保存到文件saveAsTextFile
  def main(args: Array[String]): Unit = {
    //创建Spark
    val conf = new SparkConf().setAppName("DataFilter").setMaster("local[*]")
    val sc = new SparkContext(conf)
 
    val rdd = sc.textFile("input/file.txt")
 
    //过滤之后的rdd
    var rdd1 = rdd.filter(line => {
      //拆分年龄
      val age = line.split(",")(1)
      //判断是否是数字
      println(age)
//      age.matches("\\d+")//返回值是一个boolean
      true
    })
 
    //将所有的分区的数据合并到一个分区
    rdd1 = rdd1.coalesce(1)
    //保存到文件中
    rdd1.saveAsTextFile("output")
 
  }
 
}

可以使用coalesce(1)这个方法可以让结果全部保存在一个文件中。

val singlePartitionRDD = cleanedLines.coalesce(1)
    // 保存清洗后的数据到文件
    val outputPath = "path/to/your/output/file.txt"
    singlePartitionRDD.saveAsTextFile(outputPath)
    // 停止 SparkContext
    sc.stop()

http://www.xdnf.cn/news/6111.html

相关文章:

  • 大模型越狱:技术漏洞与安全挑战——从原理到防御
  • 正向代理与反向代理区别及应用
  • 威廉・巴拉德与格理集团:在高科技浪潮中的洞察与前行
  • 【极兔快递Java社招】一面复盘|数据库+线程池+AQS+中间件面面俱到
  • 【Linux网络】————详解TCP三次握手四次挥手
  • vue3:十三、分类管理-表格--slot插槽详细说明---表格内拼接字段、tag标签
  • 怎么查看当前vue项目,要求的node.js版本
  • Oracle — PL-SQL
  • JT/T 808 各版本协议字段级别对比与解析适配建议
  • NACOS基于长链接的⼀致性模型
  • 将navicat与parcharm链接
  • 2025年中国DevOps工具选型指南:主流平台能力横向对比
  • Go语言空白导入的作用与用途
  • 【SSL部署与优化​】​​如何为网站启用HTTPS:从Let‘s Encrypt免费证书到Nginx配置​​
  • 城市生命线综合管控系统解决方案-守护城市生命线安全
  • AWS CloudTrail日志跟踪启用
  • 【计算机视觉】OpenCV实战项目:GraspPicture 项目深度解析:基于图像分割的抓取点检测系统
  • 学习51单片机01(安装开发环境)
  • 机器学习基础课程-6-课程实验
  • 精益数据分析(57/126):创业移情阶段的核心要点与实践方法
  • 前端3D动画库
  • 《隐私计算:数据安全与隐私保护的新希望》
  • Spring的Validation,这是一套基于注解的权限校验框架
  • 使用libUSB-win32的简单读写例程参考
  • zookeeper本地部署
  • 存储扇区分配表:NAND Flash与SD NAND(贴片式SD卡)的架构差异
  • spark数据压缩
  • Linux动态库与静态库
  • 通用软件项目技术报告 - 导读IV(终)
  • leetcode二叉树相关题目复习(C语言版)