当前位置: 首页 > ops >正文

Spark处理过程-案例数据清洗

需求说明

准备十条符合包含用户信息的文本文件,每行格式为 姓名,年龄,性别,需要清洗掉年龄为空或者非数字的行

例如:

张三,25,男

李四,,女

王五,30,男

赵六,a,女

孙七,35,男

周八,40,女

吴九,abc,男

郑十,45,女

王十,50,男

李二,55,女

思路分析

  1. 读入文件
  2. 对每一行数据进行分析
    1. 字段拆分,拆分出年龄这个字段
    2. 判断
      • 如果它不是数字或者缺失,则忽略这条数据
      • 否则保存

(三) 代码展示

import org.apache.spark.{SparkConf, SparkContext}

object DataCleaning {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象

    val conf = new SparkConf().setAppName("DataCleaning").setMaster("local[*]")

    // 创建 SparkContext 对象

    val sc = new SparkContext(conf)

 

    // 读取文本文件,创建 RDD

    val inputFile = "input/file.txt"

    val lines = sc.textFile(inputFile)

 

    // 数据清洗操作

    val cleanedLines = lines.filter(line => { // 使用filter算子

      val fields = line.split(",")

      if (fields.length == 3) {

        val age = fields(1).trim

        age.matches("\\d+")

      } else {

        false

      }

    })
      // 输出清洗后的数据
       cleanedLines.collect().foreach(println)

 

    // 停止 SparkContext

    sc.stop()

  }

}

拓展:如何把清洗之后的数据保存到一个文件中。

可以使用coalesce(1)这个方法可以让结果全部保存在一个文件中。

代码如下:

val singlePartitionRDD = cleanedLines.coalesce(1)

    // 保存清洗后的数据到文件

    val outputPath = "path/to/your/output/file.txt"

    singlePartitionRDD.saveAsTextFile(outputPath)

    // 停止 SparkContext

    sc.stop()

http://www.xdnf.cn/news/4781.html

相关文章:

  • Linux命令行参数注入详解
  • 深入剖析ThreadLocal:原理、应用与最佳实践
  • 笔试强训——第七周
  • 前端三大件---CSS
  • 塔能空压系统节能方案:为华东某电子厂降耗赋能
  • JavaSE核心知识点02面向对象编程02-02(封装、继承、多态)
  • 基于LLM的全自动视频生成工具:MoneyPrinterTurbo 技术解析
  • CAN总线通讯接口卡:工业通信的核心桥梁
  • wails3学习-runtime:Window无边框设置
  • 数据结构(四)——栈的应用—数制转换
  • Java线程阻塞方法LockSupport.park()/Thread.sleep()/Object.wait()详解:原理、区别
  • java实战(第六篇):统计投票信息
  • [特殊字符]【深圳金融科技交流会】AI大模型如何重塑资管新生态?一线实战干货来了!
  • 极简远程革命:打破公网桎梏,重塑数字生活新体验
  • Vue 3.0中Treeshaking特性
  • Nacos源码—6.Nacos升级gRPC分析二
  • 从SiC到数字孪生:PSR芯片的技术迭代与未来布局
  • windows10 系统显示mov文件格式缩略图
  • 使用Hyper-V 安装Windows11操作系统
  • 缺乏实体人形机器人的主流高精度仿真方案
  • Matlab 分数阶PID控制
  • 数组和指针典型例题合集(一维数组、字符数组、二维数组)
  • python: 列表切片
  • Python Cookbook-7.8 使用 Berkeley DB 数据库
  • 最优化方法Python计算:有约束优化应用——线性Lasso回归预测器
  • 【Go】优化文件下载处理:从多级复制到零拷贝流式处理
  • PyTorchVideo实战:从零开始构建高效视频分类模型
  • 单片机自动排列上料控制程序 下
  • MySQL基础关键_012_事务
  • Modbus RTU 转 PROFINE 网关