当前位置: 首页 > news >正文

Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库

在 Spark 中,可以使用 Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库。以下是一个完整的示例,展示如何实现这一过程。

环境准备

  1. 安装 MySQL:确保 MySQL 数据库已安装并运行。
  2. 创建 MySQL 数据库和表
    CREATE DATABASE sparkdb;
    USE sparkdb;CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50),age INT,country VARCHAR(50)
    );
    
  3. 下载 MySQL JDBC 驱动
    • 从 MySQL 官方网站 下载 MySQL JDBC 驱动(mysql-connector-java-x.x.xx.jar)。
    • 将下载的 JAR 文件放置在 Spark 的 jars 目录下(例如 spark-3.3.0/jars/)。

示例代码

以下是一个完整的 Scala 示例代码,展示如何读取 CSV 文件并将其写入 MySQL 数据库:

import org.apache.spark.sql.{SparkSession, DataFrame}object CsvToMySQL {def main(args: Array[String]): Unit = {// 初始化 SparkSessionval spark = SparkSession.builder.appName("CsvToMySQL").master("local[*]").getOrCreate()// 读取 CSV 文件val csvFilePath = "path/to/users.csv" // 替换为你的 CSV 文件路径val df: DataFrame = spark.read.option("header", "true") // 第一行是表头.option("inferSchema", "true") // 自动推断数据类型.csv(csvFilePath)// 查看读取的数据df.show()// 配置 MySQL 数据库连接信息val jdbcUrl = "jdbc:mysql://localhost:3306/sparkdb"val jdbcUser = "root" // 替换为你的 MySQL 用户名val jdbcPassword = "password" // 替换为你的 MySQL 密码val jdbcTable = "users"// 将数据写入 MySQL 数据库df.write.format("jdbc").option("url", jdbcUrl).option("dbtable", jdbcTable).option("user", jdbcUser).option("password", jdbcPassword).mode("append") // 如果表已存在,追加数据.save()// 停止 SparkSessionspark.stop()}
}

示例 CSV 文件

假设你的 CSV 文件 users.csv 内容如下:

name,age,country
Alice,25,China
Bob,30,USA
Charlie,35,Japan
David,40,Germany

运行步骤

  1. 保存代码:将上述代码保存为 CsvToMySQL.scala 文件。
  2. 编译和运行
    • 使用 SBT 或 Maven 构建项目。
    • 在 IntelliJ IDEA 中运行程序。
  3. 验证结果
    • 登录到 MySQL 数据库,检查 sparkdb 数据库中的 users 表,确保数据已正确插入。

注意事项

  1. CSV 文件路径:确保 csvFilePath 指向正确的 CSV 文件路径。
  2. MySQL 用户名和密码:替换为你的实际 MySQL 用户名和密码。
  3. JDBC 驱动:确保 MySQL JDBC 驱动已正确放置在 Spark 的 jars 目录下。
  4. 数据模式:在写入数据库时,mode("append") 表示追加数据。如果需要覆盖表,可以使用 mode("overwrite")
  5. 性能优化:对于大规模数据写入,可以考虑使用批量插入(batchsize)等优化选项。

通过以上步骤,你可以使用 Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库。

http://www.xdnf.cn/news/417529.html

相关文章:

  • niushop单商户V5多门店版V5.5.0全插件+商品称重、商家手机端+搭建环境教程
  • Unity引擎源码-物理系统详解-其一
  • centos中libc.so.6No such file的解决方式
  • AI+企业应用级PPT生成(实战)
  • 初识XML
  • 软件测试(概念1)
  • 使用CAS操作实现乐观锁的完整指南
  • C++的历史与发展
  • 原创-业务接口数据监控
  • MyBatis-Plus的批量插入与原生JDBC效率对比
  • git 怎么更改本地的存储的密码
  • 【PostgreSQL数据分析实战:从数据清洗到可视化全流程】金融风控分析案例-10.3 风险指标可视化监控
  • Yarn-概述
  • 用自写的jQuery库+Ajax实现了省市联动
  • 专题三:穷举vs暴搜vs深搜vs回溯vs剪枝(全排列)决策树与递归实现详解
  • 实现 STM32 PWM 输出:原理、配置与应用详解
  • 美学心得(第二百七十六集) 罗国正
  • RDD案例数据清洗
  • 【SpringBoot】从零开始全面解析Spring MVC (一)
  • 飞书配置表数据同步到数据库中
  • 《微机原理与接口技术》第 6 章 半导体存储器
  • 【Python 中文编码】
  • Jupyter Notebook 配置学习笔记
  • Spark缓存-cache
  • 【github】主页显示star和fork
  • Unity3d 打包安卓平台(Android apk)报错Gradle build failed解决方法
  • 多模态RAG与LlamaIndex——1.deepresearch调研
  • STM32 HAL驱动程序 内部Flash
  • ansible进阶版01
  • FFmpeg在Android开发中的核心价值是什么?