SparkSQL基本操作
以下是 Spark SQL 的基本操作总结,涵盖数据读取、转换、查询、写入等核心功能:
一、初始化 SparkSession
scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Demo")
.master("local[*]") // 本地模式(集群用 `spark://host:port`)
.getOrCreate()
// 导入隐式转换(用于 DataFrame 与 RDD 互转)
import spark.implicits._
二、数据读取
1. 读取文件(CSV/JSON/Parquet等)
scala
// 读取 CSV(带表头)
val csvDF = spark.read
.option("header", "true")
.option("inferSchema", "true") // 自动推断数据类型
.csv("路径/文件.csv")
// 读取 JSON
val jsonDF = spark.read.json("路径/文件.json")
// 读取 Parquet(Spark 原生格式,高效)
val parquetDF = spark.read.parquet("路径/文件.parquet")
2. 读取数据库(如 MySQL)
scala
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://host:port/db")
.option("dbtable", "表名")
.option("user", "用户名")
.option("password", "密码")
.load()
3. 从 RDD 创建 DataFrame
scala
// 示例:RDD 转 DataFrame(通过 case class 推断 Schema)
case class Person(id: Int, name: String, age: Int)
val peopleRDD = spark.sparkContext.parallelize(Seq(Person(1, "Alice", 25), Person(2, "Bob", 30)))
val peopleDF = peopleRDD.toDF() // 自动使用 case class 字段作为列名
三、基本数据操作
1. 查看数据
scala
df.show() // 打印前20行(默认)
df.show(false) // 不截断长字符串
df.printSchema() // 查看表结构
df.describe().show() // 统计摘要(均值、计数等)
2. 列操作
scala
// 选择列
df.select("name", "age").show()
// 新增列(表达式计算)
import org.apache.spark.sql.functions._
val dfWithNewColumn = df.withColumn("age_plus_1", col("age") + 1)
// 重命名列
val renamedDF = df.withColumnRenamed("old_name", "new_name")
// 删除列
val filteredDF = df.drop("column_to_drop")
3. 行过滤与排序
scala
// 过滤行(where/filter 等价)
df.filter(col("age") > 18).show()
df.where("age > 18 AND name LIKE 'A%'").show()
// 排序(asc/desc)
df.orderBy(col("age").desc, "name").show() // 按年龄降序、姓名升序
4. 分组与聚合
scala
import org.apache.spark.sql.functions._
// 分组统计(如计算每个年龄段的人数)
df.groupBy("age")
.agg(
count("*").alias("count"), // 计数
avg("score").alias("avg_score") // 平均值
).show()
// 窗口函数(如按年龄分区排序)
import org.apache.spark.sql.window.Window
val windowSpec = Window.partitionBy("age").orderBy(col("score").desc)
df.withColumn("rank", rank().over(windowSpec)).show()
四、Spark SQL 查询(SQL 语法)
1. 注册临时视图
scala
df.createOrReplaceTempView("people") // 注册为临时视图(会话级)
2. 执行 SQL 查询
scala
val sqlResult = spark.sql("""
SELECT name, age
FROM people
WHERE age > 25
ORDER BY age DESC
""")
sqlResult.show()
3. 全局临时视图(跨会话)
scala
df.createGlobalTempView("global_people") // 全局视图,需用 `global_temp.表名` 访问
spark.sql("SELECT * FROM global_temp.global_people").show()
五、数据写入
1. 保存为文件
scala
// 保存为 CSV(覆盖模式)
df.write.mode("overwrite") // 模式:overwrite/append/ignore/replace
.option("header", "true")
.csv("路径/输出.csv")
// 保存为 Parquet(压缩高效)
df.write.parquet("路径/输出.parquet")
2. 写入数据库(如 MySQL)
scala
df.write.format("jdbc")
.option("url", "jdbc:mysql://host:port/db")
.option("dbtable", "表名")
.option("user", "用户名")
.option("password", "密码")
.mode("append") // 追加数据
.save()
3. 保存为 Hive 表
scala
df.write.saveAsTable("hive_table") // 需提前启用 Hive 支持(spark.sql.catalogImplementation = hive)
六、数据类型与转换
1. 常用数据类型
- 基础类型: IntegerType 、 StringType 、 DoubleType 、 TimestampType
- 复杂类型: ArrayType 、 MapType 、 StructType (嵌套结构)
2. 类型转换
scala
import org.apache.spark.sql.functions._
// 字符串转整数
val castDF = df.withColumn("age_str", col("age").cast("string"))
// 时间格式转换
val timestampDF = df.withColumn("date", to_date(col("timestamp_col"), "yyyy-MM-dd"))
七、性能优化技巧
1. 使用 Parquet 格式:列式存储,压缩率高,查询更快。
2. 分区表:按日期/类别分区( partitionBy ),减少数据扫描范围。
3. 缓存数据: df.cache() 避免重复计算(适用于多次查询的数据集)。
4. 广播小表: spark.sql.autoBroadcastJoinThreshold 设置小表广播阈值(默认 10MB)。
八、停止 SparkSession
scala
spark.stop() // 释放资源
通过以上操作,可实现数据的读取、处理、分析和存储。实际应用中可结合业务需求灵活组合函数,或通过 Spark UI( http://localhost:4040 )监控作业执行情况。