当前位置: 首页 > news >正文

Spark,数据提取和保存

以下是使用 Spark 进行数据提取(读取)和保存(写入)的常见场景及代码示例(基于 Scala/Java/Python,不含图片操作):
 
一、数据提取(读取)
 
1. 读取文件数据(文本/CSV/JSON/Parquet 等)
 
Scala
 
scala   
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Data Read")
  .getOrCreate()

// 读取 CSV(含表头)
val csvDf = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true") // 自动推断数据类型
  .load("path/to/csv/file.csv")

// 读取 JSON
val jsonDf = spark.read.json("path/to/json/file.json")

// 读取 Parquet(Spark 原生格式,高效)
val parquetDf = spark.read.parquet("path/to/parquet/dir")
 
 
Python
 
python   
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data Read").getOrCreate()

# 读取 CSV
csv_df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)

# 读取 JSON
json_df = spark.read.json("path/to/json/file.json")

# 读取 Parquet
parquet_df = spark.read.parquet("path/to/parquet/dir")
 
 
2. 读取数据库数据(如 MySQL/Hive)
 
Scala(以 MySQL 为例)
 
scala   
val jdbcDf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://host:port/db?useSSL=false")
  .option("dbtable", "table_name")
  .option("user", "username")
  .option("password", "password")
  .load()
 
 
Python(以 Hive 为例,需启用 Hive 支持)
 
python   
# 读取 Hive 表(需在 SparkSession 中启用 Hive)
hive_df = spark.sql("SELECT * FROM hive_table")
 
 
二、数据保存(写入)
 
1. 保存为文件(CSV/JSON/Parquet 等)
 
Scala
 
scala   
// 保存为 CSV(覆盖模式,含表头)
csvDf.write.format("csv")
  .option("header", "true")
  .mode("overwrite") // 模式:overwrite/append/ignore/errorIfExists
  .save("output/csv_result")

// 保存为 Parquet(分区存储,提升查询性能)
parquetDf.write.partitionBy("category") // 按字段分区
  .mode("append")
  .parquet("output/parquet_result")
 
 
Python
 
python   
# 保存为 JSON
json_df.write.json("output/json_result", mode="overwrite")

# 保存为 Parquet(指定压缩格式)
parquet_df.write.parquet("output/parquet_result", compression="snappy")
 
 
2. 保存到数据库(如 MySQL/Hive)
 
Scala(以 MySQL 为例)
 
scala   
jdbcDf.write.format("jdbc")
  .option("url", "jdbc:mysql://host:port/db?useSSL=false")
  .option("dbtable", "target_table")
  .option("user", "username")
  .option("password", "password")
  .mode("append") // 追加模式
  .save()
 
 
Python(以 Hive 为例)
 
python   
# 保存为 Hive 表(需启用 Hive 支持)
hive_df.write.saveAsTable("hive_target_table", mode="overwrite")
 
 
三、关键参数说明
 
1. 读取模式(文件)
 
-  inferSchema : 是否自动推断数据类型(适用于 CSV/JSON,需读取少量数据,影响性能)。
 
-  header : CSV 是否包含表头( true/false )。
 
2. 写入模式( mode )
 
-  overwrite : 覆盖已有数据。
 
-  append : 追加到现有数据。
 
-  ignore : 忽略写入(不报错)。
 
-  errorIfExists : 存在则报错(默认)。
 
3. 数据库连接
 
- 需添加对应数据库驱动(如 MySQL 的  mysql-connector-java )。
 
- 对于大规模数据,建议使用分区并行写入(如  option("numPartitions", "4") )。
 
四、典型场景示例
 
场景:从 MySQL 读取数据,清洗后保存为 Parquet
 
scala   
// 读取 MySQL 数据
val mysqlDf = spark.read.jdbc(
  url = "jdbc:mysql://host:port/source_db",
  dbtable = "source_table",
  properties = Map("user" -> "u", "password" -> "p")
)

// 数据清洗(示例:过滤空值)
val cleanedDf = mysqlDf.na.drop("any")

// 保存为 Parquet(按日期分区)
cleanedDf.write.partitionBy("date")
  .parquet("output/cleaned_data")
 
 
通过以上方法,可灵活使用 Spark 完成数据提取和保存任务,支持多种数据源和格式。

http://www.xdnf.cn/news/516151.html

相关文章:

  • LearnOpenGL---着色器
  • 板凳-------Mysql cookbook学习 (三)
  • Qwen3数据集格式化指南:从对话模板到推理模式,结合Unsloth实战演练
  • 高压BOOST芯片-TPQ80302
  • <前端小白> 前端网页知识点总结
  • 脚本一键完成alist直接在windows上进行磁盘映射为本地磁盘webdav
  • jqGrid冻结列错行问题,将冻结表格(悬浮表格)与 正常表格进行高度同步
  • 计算机网络概要
  • Oracle 内存优化
  • 给easyui的textbox绑定回车事件
  • 翻译:20250518
  • Go 后端中双 token 的实现模板
  • 需求与实际业务需求脱节,怎么办?
  • 安卓端互动娱乐房卡系统调试实录:从UI到协议的万字深拆(第一章)
  • QT学习3
  • Socket.IO是什么?适用哪些场景?
  • 基于马尔可夫链的状态转换,用概率模型预测股市走势
  • 2025年- H31-Lc139- 242.回文链表(快慢指针)---java版--需2刷
  • 新型太空电梯——半摆卫星太空电梯 的设计与验证
  • 【Python数据处理系列】输入txt,读取特定字符转换成特定csv数据并输出
  • PointNet++:点云处理的升级版算法
  • WebSocket实时双向通信:从基础到实战
  • 3:OpenCV—视频播放
  • 彻底解决docker代理配置与无法拉取镜像问题
  • 第二章 苍穹外卖
  • Git基础原理和使用
  • 区间带边权并查集,XY4060泄露的测试点
  • elementplus menu 设置 activeindex
  • GO语言语法---For循环、break、continue
  • 计算机组成与体系结构:Snooping-Based Protocols(监听式协议)