当前位置: 首页 > news >正文

结构化数据处理

有一个名为 user.csv 的 CSV 文件,包含以下列:id(编号)、name(姓名)、age(年龄)。筛选出年龄大于18岁的数据,添加到mysql数据库中。

​​​​​​

order_id,product_id,product_name,quantity,price,user_id,order_date

1,101,iPhone 15,2,999.99,1001,2024-01-01

2,102,Samsung Galaxy S24,3,899.99,1002,2024-01-02

3,103,MacBook Pro,1,2499.99,1003,2024-01-03

4,104,PlayStation 5,2,499.99,1004,2024-01-04

5,105,Xbox Series X,1,499.99,1005,2024-01-05

6,101,iPhone 15,1,999.99,1006,2024-01-06

7,102,Samsung Galaxy S24,2,899.99,1007,2024-01-07

8,103,MacBook Pro,3,2499.99,1008,2024-01-08

9,104,PlayStation 5,1,499.99,1009,2024-01-09

10,105,Xbox Series X,2,499.99,1010,2024-01-10

 

思路分析

这是一个非常标准的二维表,我们可以使用sparkSQL中的dataFrame来完成相关的。

DataFrame的概念

DataFrame 是一种分布式数据集,类似于传统数据库中的二维表格,具有行和列的结构。它是 RDD 的一种高级抽象,提供了更丰富的操作接口和优化的执行引擎。

DataFrame 中的每一列都有一个名称和数据类型,这使得数据的处理更加结构化和类型安全。

DataFrame 可以从多种数据源创建,如文件(CSV、JSON、Parquet 等)、数据库(MySQL、Hive 等)和 RDD。

格式

DataFrame 由行(Row)和列(Column)组成。每一行代表一条记录,每一列代表一个属性。

可以通过 printSchema() 方法查看 DataFrame 的结构信息,包括列名、数据类型等。

使用

使用DataFrame处理结构化数据有两种方式: 1.是DSL编程; 2. SQL 语句

(四)SQL风格操作DataFrame

使用sql风格操作的前提是将DataFrame注册成一个临时表,在程序中直接使用spark.sql()方式执行SQL查询,结果将作为一个DataFrame返回。

val spark = SparkSession.builder().appName("SparkSQL01").master("local[*]").getOrCreate()
// 读取 CSV 文件并创建 DataFrame
val df= spark.read
.option("header", "true").option("inferSchema", "true").csv("ecommerce_data.csv")
df.registerTempTable("t_per")
spark.sql("select * from t_per where quantity > 5").show()

需要提前导入相关的包。

<dependency>

            <groupId>org.scala-lang</groupId>

            <artifactId>scala-library</artifactId>

            <version>2.12.15</version>

        </dependency>

            <dependency>

                <groupId>org.apache.spark</groupId>

                <artifactId>spark-sql_2.12</artifactId>

                <version>3.3.1</version>

            </dependency>

 

DSL风格操作DataFrame

下面我们通过一个例子来看看DataFrame的基本使用。我们从.csv文件中读入内容,创建一个DataFrame,然后调用它的相关方法:printSchema和show。

// 读取 CSV 文件并创建 DataFrame
val ecommerceDF = spark.read.option("header", "true").option("inferSchema", "true").csv("ecommerce_data.csv")// 显示 DataFrame 的基本信息
ecommerceDF.printSchema()
// 显示 DataFrame 的前几行数据
ecommerceDF.show()  
// 筛选出购买数量大于 5 的订单
val filteredDF = ecommerceDF.filter($"quantity" > 5)
filteredDF.show()  
(六)需求-按商品号分组,计算每个商品的销售总量和销售总额

// 按商品编号分组,计算每个商品的销售总量和销售总额

val groupedDF = ecommerceDF.groupBy($"product_id").agg(sum($"quantity").as("total_quantity"), sum($"quantity" * $"price").as("total_sales"))
groupedDF.show()  

解释代码:

groupBy($"product_id"):按商品编号进行分组。

agg(sum($"quantity").as("total_quantity"), sum($"quantity" * $"price").as("total_sales")):对每个分组进行聚合操作,计算销售总量和销售总额,并分别命名为 total_quantity 和 total_sales。

需求-找出销售总额最高的前3个商品

Hadoop是一个由Apache基金会所开发的分布式系统基础架构,主要解决海量数据集的存储分析计算问题。广义上讲,Hadoop是一个更广泛的概念-- Hadoop生态圈。

// 找出销售总额最高的前 3 个商品
val top3Products = groupedDF.orderBy($"total_sales".desc).limit(3)
top3Products.show()  

解释代码:

orderBy($"total_sales".desc):按销售总额降序排序。

limit(3):取前 3 条记录。

http://www.xdnf.cn/news/432883.html

相关文章:

  • GPU服务器集群部署
  • 【越狱检测】HSF: Defending against Jailbreak Attacks with Hidden State Filtering
  • c语言第一个小游戏:贪吃蛇小游戏06
  • 逃离 AI 困境:保障 “说不” 的权利,守护数字自由
  • Selenium自动化测试
  • git cherry-pick和git stash命令详解
  • Python爬虫如何应对网站的反爬加密策略?
  • 第九届御网杯网络安全大赛初赛WP
  • 多线程与并发之进程
  • Focal Loss 原理详解及 PyTorch 代码实现
  • 运行Spark程序-在shell中运行
  • 思路解析:第一性原理解 SQL
  • 2025.5.13山东大学软件学院计算机图形学期末考试回忆版本
  • msyql8.0.xx忘记密码解决方法
  • 2025.05.11阿里云机考真题算法岗-第二题
  • 重置集群(有异常时)
  • Spring 集成 SM4(国密对称加密)
  • Springboot | 如何上传文件
  • ros2-node
  • SpringBoot--springboot简述及快速入门
  • 2025年全国青少年信息素养大赛初赛模拟测试网站崩了的原因及应对比赛流程
  • SparkSQL操作Mysql
  • 1995-2022年各省能源消费总量数据(万吨标煤)
  • UDS诊断----------$11诊断服务
  • 【YOLO模型】参数全面解读
  • JavaWeb 前端开发
  • 优化的代价(AI编码带来的反思)-来自Grok
  • 基于TouchSocket实现WebSocket自定义OpCode扩展协议
  • day19-线性表(顺序表)(链表I)
  • 操作系统:内存管理