当前位置: 首页 > ai >正文

写spark程序数据计算( 数据库的计算,求和,汇总之类的)连接mysql数据库,写入计算结果

1. 添加依赖

在项目的 `pom.xml`(Maven)中添加以下依赖:

```xml

<!-- Spark SQL -->

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-sql_2.12</artifactId>

    <version>3.3.0</version>

</dependency>

 

<!-- MySQL Connector -->

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>8.0.33</version>

</dependency>

代码

import org.apache.spark.sql.{SparkSession, SaveMode}

object SparkMySQLDemo {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("SparkMySQLDemo")
      .master("local[*]") // 生产环境需改为集群模式,如 yarn
      .config("spark.sql.shuffle.partitions", "5") // 优化分区数
      .getOrCreate()

    // 设置 MySQL 连接参数
    val jdbcUrl = "jdbc:mysql://localhost:3306/your_database"
    val jdbcUsername = "your_username"
    val jdbcPassword = "your_password"

    try {
      // 从 MySQL 读取数据
      val df = spark.read
        .format("jdbc")
        .option("url", jdbcUrl)
        .option("dbtable", "source_table") // 要读取的表名
        .option("user", jdbcUsername)
        .option("password", jdbcPassword)
        .load()

      // 执行计算(示例:按 category 分组求和)
      val resultDF = df.groupBy("category")
        .agg(
          sum("amount").alias("total_amount"),
          count("*").alias("record_count")
        )

      // 打印计算结果(调试用)
      resultDF.show()

      // 将结果写入 MySQL
      resultDF.write
        .format("jdbc")
        .option("url", jdbcUrl)
        .option("dbtable", "result_table") // 目标表名
        .option("user", jdbcUsername)
        .option("password", jdbcPassword)
        .mode(SaveMode.Append) // 写入模式:覆盖/追加
        .save()

      println("数据写入 MySQL 成功!")
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      spark.stop()
    }
  }
}

http://www.xdnf.cn/news/7058.html

相关文章:

  • COCO数据集神经网络性能现状2025.5.18
  • 【数据结构】2-3-4 单链表的建立
  • 大学量化投资课程
  • C 语言学习笔记(函数)
  • 华为OD机试真题——最小循环子数组 (2025B卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • 2025/5/18
  • 非线性1 修改
  • Jsoup库和Apache HttpClient库有什么区别?
  • 数据库DDL
  • 普通用户的服务器连接与模型部署相关记录
  • Qt 信号和槽-核心知识点小结(11)
  • 建一个结合双向长短期记忆网络(BiLSTM)和条件随机场(CRF)的模型
  • 什么是时间戳?怎么获取?有什么用
  • SQL练习(12/81)
  • C++ map容器: 插入操作
  • 一次页面假死分析
  • SpringBoot3+AI
  • Text models —— BERT,RoBERTa, BERTweet,LLama
  • 项目制作流程
  • html文件cdn一键下载并替换
  • 第五部分:阶段项目 4:构建 RESTful API 服务器
  • 创建指定版本的vite项目
  • 《Python星球日记》 第88天:ChatGPT 与 LangChain
  • ADB基本操作和命令
  • 【机器学习】工具入门:飞牛启动Dify Ollama Deepseek
  • 开始学习做游戏,就现在
  • 短剧小程序系统开发源码上架,短剧项目市场分析
  • 【上位机——WPF】布局控件
  • vue3 elementplus tabs切换实现
  • Node.js 实战六:日志系统设计 —— 不只是 console.log,而是可追溯的行为记录链