当前位置: 首页 > news >正文

Spark-SQL 项目

一、项目概述

(一)实验目标

  1. 统计有效数据条数:筛选出uid、phone、addr三个字段均无空值的记录并计数。
  2. 提取用户数量最多的前 20 个地址:按地址分组统计用户数,按降序排序后取前 20 名。

(二)数据说明

  1. 数据格式
    • 输入数据为 JSON 格式,字段包括uid(用户 ID)、phone(电话号码)、addr(地址)。
    • 数据特点:部分记录存在格式不规范问题(如单引号混用、字段值缺失、地址格式不统一,例如 “江苏省 苏州”“广东省 中山” 等),需先清洗转换。
    • 示例数据

json

{"uid":"1000166111","phone":"17703771999","addr":"河南省 南阳"}

{"uid":"1000432103","phone":"15388889881","addr":"云南省 昆明"}

  1. 有效数据定义
    同时满足以下条件的记录:
    • uid不为空(非null且非空字符串);
    • phone不为空(非null且非空字符串);
    • addr不为空(非null且非空字符串)。

二、实验准备

(一)环境配置

  1. 软件依赖
    • Spark 3.x+(需启用 Hive 支持以使用get_json_object函数);
    • 编程语言:Scala/Python(本文以 Scala 为例,Python 代码可通过 PySpark 实现);
    • 配置文件:确保spark.sql.warehouse.dir指向 HDFS 或本地路径(如hdfs://node01:9000/user/hive/warehouse)。
  2. 数据准备
    • 将 JSON 数据保存为文件(如user_data.json),确保每行一个 JSON 对象;
    • 若存在格式错误(如单引号),先用文本处理工具(如sed 's/\'/"/g')统一为双引号。

三、数据处理流程

(一)数据读取与格式转换

1. 读取原始数据

使用 Spark 的 JSON 数据源直接加载数据,自动推断 Schema:

scala

val rawDF = spark.read.json("path/to/user_data.json")

rawDF.printSchema() // 检查字段是否正确解析(可能因格式问题导致字段类型为String)

2. 字段提取与清洗

通过get_json_object函数(Spark SQL 内置函数)解析 JSON 字段,处理不规范格式:

scala

// 方法1:Spark SQL语句(推荐,清晰易读)

rawDF.createOrReplaceTempView("raw_data")

val parsedDF = spark.sql("""

SELECT

get_json_object(raw_data.data, '$.uid') AS uid, -- 提取uid

get_json_object(raw_data.data, '$.phone') AS phone, -- 提取phone

trim(get_json_object(raw_data.data, '$.addr')) AS addr -- 提取addr并去除前后空格

FROM raw_data

""")

// 方法2:DataFrame API(适合编程式处理)

import org.apache.spark.sql.functions.expr

val parsedDF = rawDF.select(

expr("get_json_object(data, '$.uid')").as("uid"),

expr("get_json_object(data, '$.phone')").as("phone"),

expr("trim(get_json_object(data, '$.addr'))").as("addr")

)

(二)统计有效数据条数

1. 筛选有效数据

过滤掉任一字段为空的记录:

scala

val validDF = parsedDF.filter(

col("uid").isNotNull &&

col("phone").isNotNull &&

col("addr").isNotNull

)

或通过SQL语句:

spark.sql("SELECT * FROM parsed_data WHERE uid IS NOT NULL AND phone IS NOT NULL AND addr IS NOT NULL")

2. 计数

scala

val validCount = validDF.count()

println(s"有效数据条数:$validCount")

或通过SQL返回结果:

spark.sql("SELECT COUNT(*) AS valid_data_count FROM valid_data").show()

(三)统计用户数量最多的前 20 个地址

1. 分组聚合

按addr分组,统计每个地址的用户数(直接使用count(*),因uid唯一,也可count(DISTINCT uid),需根据业务需求选择):

scala

val addrGroupDF = validDF.groupBy("addr").count().withColumnRenamed("count", "user_count")

2. 排序与筛选

按用户数降序排序,取前 20 条:

scala

val top20Addresses = addrGroupDF.orderBy(desc("user_count")).limit(20)

top20Addresses.show(false) // 展示结果,地址不换行

3. SQL 完整实现

spark.sql("""

SELECT

addr,

COUNT(*) AS user_count-- 或COUNT(DISTINCT uid)去重统计

FROM valid_data

GROUP BY addr

ORDER BY user_count DESC

LIMIT 20

""").show()

五、扩展与优化建议

(一)数据清洗增强

  1. 地址标准化:使用正则表达式或自定义函数清洗地址(如 “江苏省苏州” 统一为 “江苏省苏州市”);
  2. 手机号格式校验:添加正则表达式过滤无效手机号(如^1[3-9]\d{9}$)。

(二)性能优化

  1. 分区与缓存:对大数据集使用repartition分区,对高频访问的中间表(如validDF)调用cache();
  2. 列式存储:将结果数据保存为 Parquet 格式(validDF.write.parquet("output/valid_data")),提升后续查询效率。

(三)结果输出

将最终结果导出到 HDFS、本地文件或数据库:

scala

top20Addresses.write

.mode("overwrite")

.csv("output/top20_addresses") // 保存为CSV文件

http://www.xdnf.cn/news/88741.html

相关文章:

  • Linux安装后无法启动24天
  • 数据集 | 柑橘果目标检测数据集
  • 大数据开发的基本流程
  • 基于机器学习的房租影响因素分析系统
  • 安卓模拟器绕过检测全解析:雷电、MuMu、蓝叠、逍遥、夜神与WSA完整指南
  • 3.1.1 MaterialDesign中DrawerHost使用案例
  • Kubernetes Docker 部署达梦8数据库
  • 蓝桥杯算法实战分享:C/C++ 题型解析与实战技巧
  • 明远智睿2351开发板:四核1.4G处理器——开启高效能Linux系统新纪元
  • 『不废话』之Python管理工具uv快速入门
  • 【Java】Hibernate的检索策略
  • python的深拷贝浅拷贝(copy /deepcopy )
  • 三维几何变换
  • usb2.0的硬件知识(一)
  • 查看MySql操作日志
  • 布隆过滤器的应用
  • 《Operating System Concepts》阅读笔记:p764-p766
  • 【Axure视频教程】不透明度函数
  • 以下是一个基于 ESP32 - S3 实现消息队列收发测试的 C 例程
  • crontab 定时备份 mysql 数据库
  • CF思维题(cf round 1019 div.2 b题)
  • ADS基本操作之S参数仿真
  • 如何高效优化复杂的SQL查询:以项目发布管理为例
  • Java知识大纲
  • 内存管理之文件内存映射(mmap):外存(磁盘/flash)的文件映射到应用层(跨越内核层)
  • 解析芯片低功耗设计的底层逻辑与实现方法
  • 最新项目笔记
  • Java的反射机制(曼波超易懂图文版)
  • 一洽智能硬件行业解决方案探索与实践
  • 从零开始学Python游戏编程33-指令模式2