当前位置: 首页 > news >正文

RDD案例数据清洗

在 Spark 中,RDD(Resilient Distributed Dataset)是分布式数据集的基本抽象。数据清洗是数据预处理中的一个重要步骤,通常包括去除重复数据、过滤无效数据、转换数据格式等操作。以下是一个使用 RDD 进行数据清洗的完整示例。

示例场景

假设我们有一个包含用户信息的文本文件 users.txt,每行是一个用户记录,格式如下:

user1,25,China
user2,30,USA
user3,invalid,Australia
user4,22,China
user5,28,USA
user6,35,invalid

我们需要对数据进行清洗,包括:

  1. 过滤掉无效的年龄数据(非数字或不在合理范围)。
  2. 过滤掉无效的国家数据(只保留指定的国家,如 ChinaUSA)。
  3. 去除重复的用户记录。

实现步骤

  1. 创建 SparkContext:初始化 Spark 环境。
  2. 读取数据:从文件中加载数据到 RDD。
  3. 数据清洗:过滤无效数据和重复数据。
  4. 保存结果:将清洗后的数据保存到文件。

以下是完整的代码实现:

import org.apache.spark.{SparkConf, SparkContext}object DataCleaning {def main(args: Array[String]): Unit = {// 初始化 Spark 环境val conf = new SparkConf().setAppName("DataCleaning").setMaster("local[*]") // 使用本地模式运行val sc = new SparkContext(conf)// 读取数据val inputPath = "path/to/users.txt"val rawData = sc.textFile(inputPath)// 数据清洗val cleanedData = rawData.map(line => line.split(",")) // 将每行数据分割为数组.filter(arr => arr.length == 3) // 确保每行有三个字段.filter(arr => {// 过滤无效年龄数据val age = try {arr(1).toInt} catch {case _: NumberFormatException => -1}age >= 18 && age <= 100 // 假设年龄范围为 18 到 100}).filter(arr => {// 过滤无效国家数据val country = arr(2)country == "China" || country == "USA"}).map(arr => (arr(0), arr(1), arr(2))) // 转换为元组.distinct() // 去除重复记录// 保存清洗后的数据val outputPath = "path/to/cleaned_users.txt"cleanedData.saveAsTextFile(outputPath)// 停止 SparkContextsc.stop()}
}

代码说明

  1. 初始化 Spark 环境

    • 使用 SparkConf 配置 Spark 应用程序的名称和运行模式(本地模式)。
    • 创建 SparkContext 实例。
  2. 读取数据

    • 使用 sc.textFile 方法从指定路径加载数据到 RDD。
  3. 数据清洗

    • 使用 map 方法将每行数据分割为数组。
    • 使用 filter 方法过滤无效的年龄数据和国家数据。
    • 使用 distinct 方法去除重复记录。
  4. 保存结果

    • 使用 saveAsTextFile 方法将清洗后的数据保存到指定路径。

示例输入和输出

输入文件 users.txt
user1,25,China
user2,30,USA
user3,invalid,Australia
user4,22,China
user5,28,USA
user6,35,invalid
user1,25,China
输出文件 cleaned_users.txt
user1,25,China
user2,30,USA
user4,22,China
user5,28,USA

运行项目

  1. 将上述代码保存为 DataCleaning.scala 文件。
  2. 在 IntelliJ IDEA 中运行该程序。
  3. 查看输出文件 cleaned_users.txt,确保数据清洗结果正确。

通过以上步骤,你可以使用 Spark 的 RDD API 完成数据清洗任务。

http://www.xdnf.cn/news/417205.html

相关文章:

  • 【SpringBoot】从零开始全面解析Spring MVC (一)
  • 飞书配置表数据同步到数据库中
  • 《微机原理与接口技术》第 6 章 半导体存储器
  • 【Python 中文编码】
  • Jupyter Notebook 配置学习笔记
  • Spark缓存-cache
  • 【github】主页显示star和fork
  • Unity3d 打包安卓平台(Android apk)报错Gradle build failed解决方法
  • 多模态RAG与LlamaIndex——1.deepresearch调研
  • STM32 HAL驱动程序 内部Flash
  • ansible进阶版01
  • FFmpeg在Android开发中的核心价值是什么?
  • RAG之大规模解析 PDF 文档全流程实战
  • 开源免费无广告专注PDF编辑、修复和管理工具 办公学术 救星工具
  • 服务器相关
  • Java Web 应用安全响应头配置全解析:从单体到微服务网关的实践
  • Vue 2 项目中配置 Tailwind CSS 和 Font Awesome 的最佳实践,加 daisyUI 安装
  • 存算一体芯片对传统GPU架构的挑战:在GNN训练中的颠覆性实验
  • w~大模型~合集30
  • 【后端】SpringBoot用CORS解决无法跨域访问的问题
  • Go 语言即时通讯系统开发日志-day1:从简单消息收发 Demo 起步
  • Vue使用scale方法实现响应式自适应大屏缩放通用组件详解(附完整代码)
  • cursor Too many报错 显示锁机器码怎么办?也就是Cursor的
  • 101alpha---第10
  • 各类型和字节数组互相转换
  • pyenv无法使用pip解决方案
  • Cyrus-Beck算法的计算方法
  • C++类的继承和派生
  • MYSQL事务原理分析(三)
  • 动作识别笔记