当前位置: 首页 > ds >正文

在scala中使用sparkSQL连接MySQL并添加新数据

以下是使用 Spark SQL(Scala)连接 MySQL 并添加新数据的完整代码示例:

scala

import org.apache.spark.sql.SparkSessionobject MySQLSparkExample {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder.appName("MySQLDataInsertExample").config("spark.master", "local[*]").config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26").getOrCreate()// 配置MySQL连接参数val jdbcUrl = "jdbc:mysql://localhost:3306/your_database?useSSL=false"val connectionProperties = new java.util.Properties()connectionProperties.setProperty("user", "your_username")connectionProperties.setProperty("password", "your_password")connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")try {// 1. 读取现有数据示例val existingData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)println("现有数据:")existingData.show()// 2. 创建要添加的新数据import spark.implicits._val newData = Seq((1001, "John Doe", "Engineering", 5000.0),(1002, "Jane Smith", "Marketing", 6000.0)).toDF("id", "name", "department", "salary")// 3. 将新数据追加到MySQL表newData.write.mode("append").jdbc(jdbcUrl, "employees", connectionProperties)println("数据添加成功!")// 4. 验证添加后的数据val updatedData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)println("添加后的数据:")updatedData.show()} catch {case e: Exception =>println(s"操作失败: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()}}
}

关键配置说明:

  1. 依赖配置

    scala

    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26")
    
  2. JDBC URL 格式

    scala

    jdbc:mysql://<host>:<port>/<database>?useSSL=false
    
  3. 写入模式

    • append:追加数据(不会删除现有数据)
    • overwrite:覆盖表(先删除再插入)
    • errorIfExists:如果表存在则报错(默认)

执行步骤:

  1. 确保 MySQL 服务已启动
  2. 创建测试表:

    sql

    CREATE TABLE employees (id INT PRIMARY KEY,name VARCHAR(50),department VARCHAR(50),salary DOUBLE
    );
    
  3. 运行 Spark 应用:

    bash

    spark-submit --class MySQLSparkExample \--master local[*] \--packages mysql:mysql-connector-java:8.0.26 \your-application.jar
    

注意事项:

  1. 替换数据库连接参数:

    • your_database
    • your_username
    • your_password
  2. 如果遇到时区问题,可在 URL 中添加:

    scala

    ?serverTimezone=UTC
    
  3. 确保 MySQL 用户有写入权限:

    sql

    GRANT INSERT ON your_database.employees TO 'your_username'@'%';
    
  4. 对于生产环境,建议:

    • 使用连接池(如 HikariCP)
    • 启用 SSL 加密
    • 配置适当的重试机制
    • 监控数据库连接状态
http://www.xdnf.cn/news/5906.html

相关文章:

  • uniapp-商城-56-后台 新增商品(弹窗属性继续分析)
  • 解构认知边界:论万能方法的本体论批判与方法论重构——基于跨学科视阈的哲学-科学辩证
  • Node.js 中的 URL 模块
  • sql 备份表a数据到表b
  • 论文精读:YOLO-UniOW: Efficient Universal Open-World Object Detection
  • 【Pandas】pandas DataFrame cumprod
  • 一文理清人工智能,机器学习,深度学习的概念
  • TCP协议十大核心特性深度解析:构建可靠传输的基石
  • 标贝科技:大模型领域数据标注的重要性与标注类型分享
  • Python格式化字符串学习笔记
  • 如何使用远程桌面控制电脑
  • 网页禁止粘贴的解决方法(以学习通网页为例)
  • puppy系统详解
  • 中国古代史4
  • Android中ConstraintLayout约束布局使用详解
  • 虚拟主机与独立服务器:哪个更好?
  • MFCC特征提取及Griffin-Lim算法(librosa实现)
  • 使用 AddressSanitizer 检测栈内存越界错误
  • 如何配置本机host文件
  • Power BI 实操案例,将度量值转化为切片器(动态切换分析指标)
  • 在Text-to-SQL任务中应用过程奖励模型
  • 【Python】Python常用数据类型详解
  • cursor 如何在项目内自动创建规则
  • uniapp-商城-54-后台 新增商品(页面布局)
  • Linux异步通知机制详解
  • TongWeb7.0常用-D参数说明
  • python标准库--sys - 系统相关功能在算法比赛的应用
  • 无人机信号线被电磁干扰导致停机
  • mplayer使用详解
  • JDK 安装与配置