结构化数据处理
有一个名为 user.csv 的 CSV 文件,包含以下列:id(编号)、name(姓名)、age(年龄)。筛选出年龄大于18岁的数据,添加到mysql数据库中。
order_id,product_id,product_name,quantity,price,user_id,order_date
1,101,iPhone 15,2,999.99,1001,2024-01-01
2,102,Samsung Galaxy S24,3,899.99,1002,2024-01-02
3,103,MacBook Pro,1,2499.99,1003,2024-01-03
4,104,PlayStation 5,2,499.99,1004,2024-01-04
5,105,Xbox Series X,1,499.99,1005,2024-01-05
6,101,iPhone 15,1,999.99,1006,2024-01-06
7,102,Samsung Galaxy S24,2,899.99,1007,2024-01-07
8,103,MacBook Pro,3,2499.99,1008,2024-01-08
9,104,PlayStation 5,1,499.99,1009,2024-01-09
10,105,Xbox Series X,2,499.99,1010,2024-01-10
(二)思路分析
这是一个非常标准的二维表,我们可以使用sparkSQL中的dataFrame来完成相关的。
(三)DataFrame的概念
DataFrame 是一种分布式数据集,类似于传统数据库中的二维表格,具有行和列的结构。它是 RDD 的一种高级抽象,提供了更丰富的操作接口和优化的执行引擎。
DataFrame 中的每一列都有一个名称和数据类型,这使得数据的处理更加结构化和类型安全。
DataFrame 可以从多种数据源创建,如文件(CSV、JSON、Parquet 等)、数据库(MySQL、Hive 等)和 RDD。
格式
DataFrame 由行(Row)和列(Column)组成。每一行代表一条记录,每一列代表一个属性。
可以通过 printSchema() 方法查看 DataFrame 的结构信息,包括列名、数据类型等。
使用
使用DataFrame处理结构化数据有两种方式: 1.是DSL编程; 2. SQL 语句
(四)SQL风格操作DataFrame
使用sql风格操作的前提是将DataFrame注册成一个临时表,在程序中直接使用spark.sql()方式执行SQL查询,结果将作为一个DataFrame返回。
val spark = SparkSession.builder().appName("SparkSQL01").master("local[*]").getOrCreate() // 读取 CSV 文件并创建 DataFrame val df= spark.read .option("header", "true").option("inferSchema", "true").csv("ecommerce_data.csv") df.registerTempTable("t_per") spark.sql("select * from t_per where quantity > 5").show()
需要提前导入相关的包。
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.15</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
(五)DSL风格操作DataFrame
下面我们通过一个例子来看看DataFrame的基本使用。我们从.csv文件中读入内容,创建一个DataFrame,然后调用它的相关方法:printSchema和show。
// 读取 CSV 文件并创建 DataFrame
val ecommerceDF = spark.read.option("header", "true").option("inferSchema", "true").csv("ecommerce_data.csv")// 显示 DataFrame 的基本信息
ecommerceDF.printSchema()
// 显示 DataFrame 的前几行数据
ecommerceDF.show()
// 筛选出购买数量大于 5 的订单
val filteredDF = ecommerceDF.filter($"quantity" > 5)
filteredDF.show()
(六)需求-按商品号分组,计算每个商品的销售总量和销售总额
// 按商品编号分组,计算每个商品的销售总量和销售总额
val groupedDF = ecommerceDF.groupBy($"product_id").agg(sum($"quantity").as("total_quantity"), sum($"quantity" * $"price").as("total_sales"))
groupedDF.show()
解释代码:
groupBy($"product_id"):按商品编号进行分组。
agg(sum($"quantity").as("total_quantity"), sum($"quantity" * $"price").as("total_sales")):对每个分组进行聚合操作,计算销售总量和销售总额,并分别命名为 total_quantity 和 total_sales。
(七)需求-找出销售总额最高的前3个商品
Hadoop是一个由Apache基金会所开发的分布式系统基础架构,主要解决海量数据集的存储和分析计算问题。广义上讲,Hadoop是一个更广泛的概念-- Hadoop生态圈。
// 找出销售总额最高的前 3 个商品
val top3Products = groupedDF.orderBy($"total_sales".desc).limit(3)
top3Products.show()
解释代码:
orderBy($"total_sales".desc):按销售总额降序排序。
limit(3):取前 3 条记录。