当前位置: 首页 > news >正文

在scala中使用sparkSQL读入csv文件

以下是使用 Spark SQL(Scala)读取 CSV 文件的完整代码示例:

scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._object CSVReadExample {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder.appName("CSVReadExample").config("spark.master", "local[*]").getOrCreate()try {// 方法1:自动推断模式val df1 = spark.read.option("header", "true")  // 第一行是否包含列名.option("inferSchema", "true")  // 自动推断数据类型.csv("path/to/your/file.csv")println("方法1:自动推断模式")df1.printSchema()df1.show()// 方法2:指定自定义模式val customSchema = StructType(Array(StructField("id", IntegerType, nullable = true),StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true),StructField("salary", DoubleType, nullable = true)))val df2 = spark.read.option("header", "true").schema(customSchema)  // 使用自定义模式.csv("path/to/your/file.csv")println("方法2:指定自定义模式")df2.printSchema()df2.show()// 方法3:读取多文件val df3 = spark.read.option("header", "true").csv("path/to/your/files/*.csv")  // 读取目录下所有CSV文件println("方法3:读取多文件")df3.count()df3.show()// 执行SQL查询示例df2.createOrReplaceTempView("people")val result = spark.sql("SELECT name, age FROM people WHERE age > 30")result.show()} catch {case e: Exception =>println(s"读取CSV失败: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()}}
}

常用 CSV 读取选项:

scala

spark.read.option("header", "true")  // 是否有表头.option("delimiter", ",")  // 分隔符,默认为逗号.option("quote", "\"")    // 引号字符,默认为双引号.option("escape", "\\")   // 转义字符.option("nullValue", "null")  // 指定空值表示.option("dateFormat", "yyyy-MM-dd")  // 日期格式.option("inferSchema", "true")  // 是否自动推断模式.csv("path/to/file.csv")

处理特殊情况:

  1. 处理引号包含的分隔符

    scala

    .option("quote", "\"")
    .option("escape", "\"")
    
  2. 处理包含换行符的字段

    scala

    .option("multiline", "true")
    
  3. 处理不同编码的文件

    scala

    .option("charset", "UTF-8")
    

执行步骤:

  1. 准备示例 CSV 文件 people.csv

    csv

    id,name,age,salary
    1,Alice,25,5000.0
    2,Bob,30,6000.0
    3,Charlie,35,7500.0
    
  2. 运行 Spark 应用:

    bash

    spark-submit --class CSVReadExample \--master local[*] \your-application.jar
    
  3. 也可以在 Spark Shell 中交互式运行:

    bash

    spark-shell
    
     

    然后粘贴代码片段执行

性能优化建议:

  1. 禁用自动推断模式(如果已知模式):

    scala

    .schema(customSchema)
    .option("inferSchema", "false")  // 提高性能
    
  2. 分区并行读取

    scala

    // 增加分区数提高并行度
    val df = spark.read.csv("path/to/file.csv").repartition(10)
    
  3. 使用列剪枝

    scala

    // 只选择需要的列
    df.select("name", "age")
    
  4. 过滤数据

    scala

    // 尽早过滤数据减少内存占用
    df.filter($"age" > 30)
    

错误处理:

  1. 处理格式错误

    scala

    .option("mode", "DROPMALFORMED")  // 丢弃格式错误的记录
    .option("mode", "PERMISSIVE")    // 将错误字段设为null
    .option("mode", "FAILFAST")      // 遇到错误立即失败
    
  2. 自定义错误处理

    scala

    import org.apache.spark.sql.Rowval df = spark.read.csv("path/to/file.csv")
    val validRows = df.rdd.filter { row =>try {// 自定义验证逻辑row.getString(1).length > 0} catch {case e: Exception => false}
    }
    

http://www.xdnf.cn/news/430903.html

相关文章:

  • python中的进程锁与线程锁
  • Mysql 事物
  • React状态管理-对state进行保留和重置
  • FCB文件疑问+求助:01 百度网盘视频自动生成AI笔记pdf会出现对应fcb文件-作用待详解
  • FFmpeg3.4 libavcodec协议框架增加新的decode协议
  • INFINI Console 纳管 Elasticsearch 9(一):指标监控、数据管理、DSL 语句执行
  • 深入理解 C++ 标准模板库(STL):从基础到实践
  • 不用mathtype将word中的公式修改成新罗马字体(加编号)
  • Android设备是否满足硬件要求
  • R-tree详解
  • 快速幂算法详解
  • 【前端】【JavaScript】【总复习】四万字详解JavaScript知识体系
  • 【C++进阶篇】二叉搜索树的实现(赋源码)
  • 国产大模型「五强争霸」,决战AGI!
  • upload-labs通关笔记-第3关 文件上传之黑名单绕过
  • 数据结构(2)线性表-顺序表
  • 二次封装 el-dialog 组件:打造更灵活的对话框解决方案
  • VUE_UI组件的二次封装
  • Redis Cluster 集群搭建和集成使用的详细步骤示例
  • 微信小程序分包策略:优化加载性能与用户体验
  • 使用Kubernetes实现零停机部署
  • android抓包踩坑记录
  • linux系统如何将采集的串口数据存储到txt
  • TCP首部格式及三次握手四次挥手
  • 操作系统导论——第29章 基于锁的并发数据结构
  • 【25软考网工】第六章(5)应用层安全协议
  • 讯联云库项目开发日志(一)
  • 记录算法笔记(2025.5.13)二叉树的最大深度
  • 基于STM32、HAL库的ADAU1701JSTZ-RL音频接口芯片驱动程序设计
  • flink的TaskManager 内存模型