当前位置: 首页 > ds >正文

在scala中sparkSQL连接masql并添加新数据

以下是 Scala 中使用 Spark SQL 连接 MySQL 并添加数据的完整代码示例(纯文本):

 

1. 准备连接参数(需替换实际信息)

 

scala

val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"  

val tableName = "users" // 目标表名  

val user = "root"  

val password = "your_password"  

val driverClass = "com.mysql.cj.jdbc.Driver" // MySQL 8+ 驱动类(5.x 用 com.mysql.jdbc.Driver)  

 

 

2. 创建 SparkSession

 

scala

import org.apache.spark.sql.SparkSession  

 

val spark = SparkSession.builder()  

  .appName("Spark SQL MySQL Insert")  

  .master("local[*]") // 单机模式,集群改为 "yarn" 等  

  .getOrCreate()  

 

 

3. 生成待插入数据(示例 DataFrame)

 

scala

import spark.implicits._  

 

// 示例数据:插入两条用户记录(假设表结构为 id INT, name STRING, age INT)  

val newData = Seq(  

  (3, "Alice", 28),  

  (4, "Bob", 30)  

).toDF("id", "name", "age")  

 

 

4. 写入数据到 MySQL(追加模式)

 

scala

newData.write.jdbc(  

  url = jdbcUrl,  

  table = tableName,  

  mode = "append", // 写入模式:append(追加)、overwrite(覆盖)等  

  properties = new java.util.Properties() {{  

    setProperty("user", user)  

    setProperty("password", password)  

    setProperty("driver", driverClass)  

  }}  

)  

 

 

关键说明

 

1. 写入模式(mode):

 

-  append :数据追加到现有表(表需存在)。

 

-  overwrite :覆盖现有表(需注意权限和数据安全)。

 

-  ignore :忽略重复数据(需表有唯一约束)。

 

-  failIfExists :表存在时抛出异常(默认模式)。

 

2. 表结构要求:

 

- 目标表需提前创建,字段类型需与 DataFrame 匹配(如  id  对应  INT , name  对应  VARCHAR )。

 

3. 驱动与版本适配:

 

- 若报  ClassNotFoundException ,检查驱动是否正确部署(通过  --jars  参数或放入  $SPARK_HOME/jars/ )。

 

- MySQL 5.x 和 8.x 驱动类名不同,需对应修改  driverClass 。

 

4. 批量写入优化:

 

- 可添加参数  ?rewriteBatchedStatements=true  到  jdbcUrl  中,提升批量插入性能:

scala

val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true"  

 

 

完整代码整合

 

scala

import org.apache.spark.sql.SparkSession  

import spark.implicits._  

 

object SparkMySQLInsert {  

  def main(args: Array[String]): Unit = {  

    // 连接参数  

    val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"  

    val tableName = "users"  

    val user = "root"  

    val password = "your_password"  

    val driverClass = "com.mysql.cj.jdbc.Driver"  

 

    // 创建 SparkSession  

    val spark = SparkSession.builder()  

      .appName("Spark SQL MySQL Insert")  

      .master("local[*]")  

      .getOrCreate()  

 

    // 生成待插入数据  

    val newData = Seq(  

      (3, "Alice", 28),  

      (4, "Bob", 30)  

    ).toDF("id", "name", "age")  

 

    // 写入数据  

    newData.write.jdbc(  

      url = jdbcUrl,  

      table = tableName,  

      mode = "append",  

      properties = new java.util.Properties() {{  

        setProperty("user", user)  

        setProperty("password", password)  

        setProperty("driver", driverClass)  

      }}  

    )  

 

    spark.stop()  

  }  

}  

 

 

执行时需通过  spark-submit  命令提交,并指定 MySQL 驱动包:

 

bash

spark-submit --jars /path/to/mysql-connector-java.jar your_app.jar

http://www.xdnf.cn/news/6084.html

相关文章:

  • python使用OpenCV 库将视频拆解为帧并保存为图片
  • 【Mac 从 0 到 1 保姆级配置教程 15】- Python 环境一键安装与配置,就是这么的丝滑
  • 虚拟机Ubuntu系统怎么扩展容量,扩展容量后进不去系统怎么办?
  • python共享内存实际案例,传输opencv frame
  • Python面向对象编程(OOP)深度解析:从封装到继承的多维度实践
  • 【论信息系统项目的资源管理】
  • 【Git】合并和变基的区别
  • windows 强行终止进程,根据端口号
  • 人工智能技术演进:从多模态融合到智能体落地的实践探索
  • uart16550详细说明
  • 使用虚拟机Linux写程序
  • 网站开发过程中样式忽然不显示问题
  • GOOSE协议publisher上传频率
  • Playwright 安装配置文件详解
  • 爆肝整理!软件测试面试题整理(项目+接口问题)
  • OpenCV特征处理全解析:从检测到匹配的完整指南
  • 二分查找算法的思路
  • linq中 List<T>.ForEach() 与 的 Select() 方法区别——CAD c#二次开发
  • HCIP实验(BGP联邦实验)
  • 21.three官方示例+编辑器+AI快速学习webgl_buffergeometry_selective_draw
  • Q1财报持续向好,腾讯音乐如何在不确定中寻找确定性?
  • 如何将两台虚拟机进行搭桥
  • 防重入或并发调用(C++)
  • C语言指针循环使用指南
  • Ansys 产品在Windows系统的卸载(2025R1版)
  • 【Redis】RedLock实现原理
  • 笔试强训(十七)
  • 12.1寸工业液晶屏M121XGV20-N10显示单元技术档案
  • 126.在 Vue 3 中使用 OpenLayers 实现绘制正方形、正三角形、正五边形
  • 使用PHP对接日本股票市场数据