SparkSQL-数据提取和保存
(一)需求说明
从给定的user.csv文件中,读入用户数据,过滤掉年龄<18岁的信息,然后把剩余的数据写入mysql数据库中。这里要先去准备mysql数据库。
(二)思路分析
准备工作:
- 建立一个.csv文件,然后添加基本数据。
- 在mysql端建立一个数据表。
- 准备user.csv文件。
- 在mysql中创建数据表,特别注意字符编码的问题
- 编写spark代码:
- 读入csv文件到 dataFrame
- dataFrame做数据筛选
- dataFrame做数据写入到mysql
(三)核心步骤
1. 在mysql中创建数据表
- 打开finalshell,连接hadoop100
- 通过systemctl start mysqld 命令来启动服务
- 通过mysql -uroot -p 来登录mysql
- 通过create table来创建数据表
2. spark代码实现
从csv中读入数据到DataFrame。这里用到的方法是read.csv()
val spark = SparkSession.builder().appName("SparkSQL03").master("local[*]").getOrCreate()
// 读入data/user.csv文件的数据
val df = spark.read
.csv("data/user.csv").toDF("id","name","age")
df.show()
参考代码
import org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject SparkSQL03 {// 连接hadoop100上的mysql数据库,读出spark数据库中的person表中的内容def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkSQL03").master("local[*]").getOrCreate()// 读入data/user.csv文件的数据,插入到mysql数据库中val df = spark.read.csv("data/user.csv").toDF("id","name","age")// 创建一个properties对象,用来存储mysql连接信息val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","000000")// 过滤掉小于18岁的数据df.filter(df("age") > 18).write.mode("append") // 写入到数据库中.jdbc("jdbc:mysql://hadoop100:3306/spark","person",prop)}}