当前位置: 首页 > news >正文

sparkSQL读入csv文件写入mysql

下面为你提供一个使用 SparkSQL 读取 CSV 文件并写入 MySQL 数据库的完整示例。这个示例展示了如何配置数据源、处理数据以及将结果写入关系型数据库。

实现步骤

python

运行

from pyspark.sql import SparkSession
from pyspark.sql.functions import col# 创建SparkSession
spark = SparkSession.builder \.appName("CSVtoMySQL") \.config("spark.jars", "/path/to/mysql-connector-java.jar") \.getOrCreate()# 读取CSV文件
csv_file_path = "path/to/your/file.csv"
df = spark.read.csv(csv_file_path,header=True,inferSchema=True,sep=",",quote='"',nullValue="nan"
)# 数据清洗与转换示例
df = df.select(col("id").cast("integer"),col("name").alias("username"),col("age").cast("integer"),col("email")
).dropna(subset=["id", "username"])# 显示数据结构和内容(可选)
print("数据结构:")
df.printSchema()print("数据内容示例:")
df.show(5)# MySQL数据库连接配置
mysql_config = {"url": "jdbc:mysql://localhost:3306/your_database","driver": "com.mysql.cj.jdbc.Driver","dbtable": "your_table","user": "your_username","password": "your_password","batchsize": 1000,  # 批量写入大小"isolationLevel": "NONE",  # 无事务隔离(提高性能)"truncate": "true"  # 写入前清空表(可选)
}# 将DataFrame写入MySQL
try:df.write \.format("jdbc") \.mode("overwrite")  # 可选: append, overwrite, ignore, errorifexists.options(**mysql_config) \.save()print(f"成功将 {df.count()} 条记录写入MySQL表 {mysql_config['dbtable']}")
except Exception as e:print(f"写入MySQL时发生错误: {e}")
finally:# 关闭SparkSessionspark.stop()

关键配置说明

  1. 依赖配置

    • 需要下载 MySQL JDBC 驱动(mysql-connector-java.jar)并通过spark.jars指定路径
    • 确保 Spark 集群所有节点都能访问该 JAR 文件
  2. CSV 读取参数

    • header=True:使用第一行作为列名
    • inferSchema=True:自动推断数据类型
    • sep:字段分隔符
    • quote:引号字符
    • nullValue:指定哪些值应被视为 NULL
  3. 数据处理

    • 示例中进行了类型转换、列重命名和空值过滤
    • 实际应用中可根据需求添加更多转换操作(如聚合、JOIN 等)
  4. MySQL 写入配置

    • mode参数控制写入模式:
      • overwrite:覆盖现有数据
      • append:追加数据
      • ignore:如果表存在则忽略
      • errorifexists:如果表存在则报错(默认)
    • batchsize:控制批量写入的记录数,可优化写入性能
    • isolationLevel:设置事务隔离级别

运行前准备

  1. 创建目标 MySQL 表:

sql

CREATE TABLE your_table (id INT PRIMARY KEY,username VARCHAR(255),age INT,email VARCHAR(255)
);

确保网络连通性:

    • Spark 集群能够访问 MySQL 服务器(通常是 3306 端口)
    • 防火墙已正确配置
  1. 参数替换:

    • 将示例中的数据库连接信息替换为实际值
    • 根据 CSV 文件格式调整读取参数

http://www.xdnf.cn/news/426241.html

相关文章:

  • 基于自动化工具autox.js的抢票(猫眼)
  • P1032 [NOIP 2002 提高组] 字串变换
  • [ctfshow web入门] web72
  • vscode百宝箱工具插件(devtools)
  • 数据可视化图表
  • pe文件二进制解析(用c/c++解析一个二进制pe文件)
  • 网络层试题
  • c语言第一个小游戏:贪吃蛇小游戏05
  • 2025.05.11阿里云机考真题算法岗-第三题
  • java高效实现爬虫
  • SAM 2: Segment Anything in Images and Videos
  • 2025年渗透测试面试题总结-渗透测试红队面试九(题目+回答)
  • kingbase链接数修改、数据备份/还原
  • py7zr解压文件时报错CrcError(crc32, f.crc32, f.filename)
  • 学习黑客Windows 卷影复制服务详解
  • SQL 索引优化指南:原理、知识点与实践案例
  • 深入理解 NumPy:Python 科学计算的基石
  • MCU程序加密保护(一)闪存读写保护法 加密与解密
  • Pycharm的终端执行allure命令出现command not found
  • 【计算机视觉】OpenCV实战项目:基于OpenCV与face_recognition的实时人脸识别系统深度解析
  • 物理:人的记忆是由基本粒子构成的吗?
  • 《类和对象(下)》
  • 抗量子计算攻击的数据安全体系构建:从理论突破到工程实践
  • FFmpeg 与 C++ 构建音视频处理全链路实战(三)—— FFmpeg 内存模型
  • Linux云计算训练营笔记day07(MySQL数据库)
  • 手搓传染病模型(SEIARW)
  • 内核深入学习3——分析ARM32和ARM64体系架构下的Linux内存区域示意图与页表的建立流程
  • Linux系统编程——进程
  • 现代化QML组件开发教程
  • UI-TARS Desktop:用自然语言操控电脑,AI 重新定义人机交互