在scala中使用sparkSQL连接MySQL并添加新数据
以下是使用 Spark SQL(Scala)连接 MySQL 并添加新数据的完整代码示例:
scala
import org.apache.spark.sql.SparkSessionobject MySQLSparkExample {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder.appName("MySQLDataInsertExample").config("spark.master", "local[*]").config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26").getOrCreate()// 配置MySQL连接参数val jdbcUrl = "jdbc:mysql://localhost:3306/your_database?useSSL=false"val connectionProperties = new java.util.Properties()connectionProperties.setProperty("user", "your_username")connectionProperties.setProperty("password", "your_password")connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")try {// 1. 读取现有数据示例val existingData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)println("现有数据:")existingData.show()// 2. 创建要添加的新数据import spark.implicits._val newData = Seq((1001, "John Doe", "Engineering", 5000.0),(1002, "Jane Smith", "Marketing", 6000.0)).toDF("id", "name", "department", "salary")// 3. 将新数据追加到MySQL表newData.write.mode("append").jdbc(jdbcUrl, "employees", connectionProperties)println("数据添加成功!")// 4. 验证添加后的数据val updatedData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)println("添加后的数据:")updatedData.show()} catch {case e: Exception =>println(s"操作失败: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()}}
}
关键配置说明:
-
依赖配置:
scala
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26")
-
JDBC URL 格式:
scala
jdbc:mysql://<host>:<port>/<database>?useSSL=false
-
写入模式:
append
:追加数据(不会删除现有数据)overwrite
:覆盖表(先删除再插入)errorIfExists
:如果表存在则报错(默认)
执行步骤:
- 确保 MySQL 服务已启动
- 创建测试表:
sql
CREATE TABLE employees (id INT PRIMARY KEY,name VARCHAR(50),department VARCHAR(50),salary DOUBLE );
- 运行 Spark 应用:
bash
spark-submit --class MySQLSparkExample \--master local[*] \--packages mysql:mysql-connector-java:8.0.26 \your-application.jar
注意事项:
-
替换数据库连接参数:
your_database
your_username
your_password
-
如果遇到时区问题,可在 URL 中添加:
scala
?serverTimezone=UTC
-
确保 MySQL 用户有写入权限:
sql
GRANT INSERT ON your_database.employees TO 'your_username'@'%';
-
对于生产环境,建议:
- 使用连接池(如 HikariCP)
- 启用 SSL 加密
- 配置适当的重试机制
- 监控数据库连接状态