sparkSQL读入csv文件写入mysql
下面为你提供一个使用 SparkSQL 读取 CSV 文件并写入 MySQL 数据库的完整示例。这个示例展示了如何配置数据源、处理数据以及将结果写入关系型数据库。
实现步骤
python
运行
from pyspark.sql import SparkSession
from pyspark.sql.functions import col# 创建SparkSession
spark = SparkSession.builder \.appName("CSVtoMySQL") \.config("spark.jars", "/path/to/mysql-connector-java.jar") \.getOrCreate()# 读取CSV文件
csv_file_path = "path/to/your/file.csv"
df = spark.read.csv(csv_file_path,header=True,inferSchema=True,sep=",",quote='"',nullValue="nan"
)# 数据清洗与转换示例
df = df.select(col("id").cast("integer"),col("name").alias("username"),col("age").cast("integer"),col("email")
).dropna(subset=["id", "username"])# 显示数据结构和内容(可选)
print("数据结构:")
df.printSchema()print("数据内容示例:")
df.show(5)# MySQL数据库连接配置
mysql_config = {"url": "jdbc:mysql://localhost:3306/your_database","driver": "com.mysql.cj.jdbc.Driver","dbtable": "your_table","user": "your_username","password": "your_password","batchsize": 1000, # 批量写入大小"isolationLevel": "NONE", # 无事务隔离(提高性能)"truncate": "true" # 写入前清空表(可选)
}# 将DataFrame写入MySQL
try:df.write \.format("jdbc") \.mode("overwrite") # 可选: append, overwrite, ignore, errorifexists.options(**mysql_config) \.save()print(f"成功将 {df.count()} 条记录写入MySQL表 {mysql_config['dbtable']}")
except Exception as e:print(f"写入MySQL时发生错误: {e}")
finally:# 关闭SparkSessionspark.stop()
关键配置说明
-
依赖配置:
- 需要下载 MySQL JDBC 驱动(
mysql-connector-java.jar
)并通过spark.jars
指定路径 - 确保 Spark 集群所有节点都能访问该 JAR 文件
- 需要下载 MySQL JDBC 驱动(
-
CSV 读取参数:
header=True
:使用第一行作为列名inferSchema=True
:自动推断数据类型sep
:字段分隔符quote
:引号字符nullValue
:指定哪些值应被视为 NULL
-
数据处理:
- 示例中进行了类型转换、列重命名和空值过滤
- 实际应用中可根据需求添加更多转换操作(如聚合、JOIN 等)
-
MySQL 写入配置:
mode
参数控制写入模式:overwrite
:覆盖现有数据append
:追加数据ignore
:如果表存在则忽略errorifexists
:如果表存在则报错(默认)
batchsize
:控制批量写入的记录数,可优化写入性能isolationLevel
:设置事务隔离级别
运行前准备
- 创建目标 MySQL 表:
sql
CREATE TABLE your_table (id INT PRIMARY KEY,username VARCHAR(255),age INT,email VARCHAR(255)
);
确保网络连通性:
-
- Spark 集群能够访问 MySQL 服务器(通常是 3306 端口)
- 防火墙已正确配置
-
参数替换:
- 将示例中的数据库连接信息替换为实际值
- 根据 CSV 文件格式调整读取参数