基于 Spark 和 Hadoop 的空气质量数据分析与预测系统
基于 Spark 和 Hadoop 的空气质量数据分析与预测系统
文章目录
- 基于 Spark 和 Hadoop 的空气质量数据分析与预测系统
- 引言
- 项目目标
- 技术栈
- 数据来源
- 系统核心功能
- 1. 用户认证与管理
- 2. 数据总览
- 3. 空气质量分析
- 4. 词云图
- 5. AQI 预测
- 项目结构预览
- 关键代码分享
- 1. Flask 主文件(app.py)
- 2. Spark 数据加载脚本
- 实现亮点
- 总结

引言
空气质量是城市生活中一个备受关注的话题,它直接关系到公众健康和环境可持续性。为了探索这一领域,我开发了一个基于大数据技术的空气质量数据分析与预测系统。该系统利用 Apache Spark 进行高效数据处理,Hadoop Hive 实现数据存储,并结合 Flask 框架和前端技术(如 ECharts 和 Bootstrap)提供直观的交互式 Web 界面。系统基于 2024 年的 4000 多条空气质量数据(来源于天气后报),通过数据分析、可视化展示和机器学习预测,为用户提供全面的空气质量洞察。
在这篇博客中,我将分享项目的目标、技术栈、核心功能、项目结构以及部分关键代码,帮助大家了解这个系统的设计与实现过程。无论你是大数据爱好者、Web 开发者,还是对空气质量研究感兴趣的读者,希望这篇文章能为你带来启发!
项目目标
这个项目的核心目标是通过大数据技术和 Web 开发手段,分析空气质量数据并提供预测功能。具体包括:
- 数据存储与管理:利用 Hadoop Hive 存储和管理大规模空气质量数据。
- 数据分析:通过 Spark 计算关键指标,如城市 AQI 均值、最大值、PM 颗粒物分布等。
- 可视化展示:使用 ECharts 和 Bootstrap 构建交互式图表和页面,展示分析结果。
- 用户交互:提供登录、注册、个人中心等功能,支持用户自定义查询。
- AQI 预测:基于机器学习模型,根据输入的污染物浓度(如 PM2.5、SO2 等)预测 AQI 值。
技术栈
- 后端:Flask(Python Web 框架)
- 数据处理:Apache Spark(指标计算)
- 数据存储:Hadoop Hive(大规模数据管理)
- 前端:Bootstrap(响应式布局)、ECharts(图表展示)
- 数据库:MySQL(用户信息存储)
- 其他:Python 机器学习库(AQI 预测)
数据来源
系统的数据来源于天气后报,包含 2024 年的 4000 多条空气质量记录。每条记录包括城市、日期、AQI、PM2.5、PM10、SO2、NO2、CO、O3 等字段,为分析和预测提供了丰富的基础。
系统核心功能
1. 用户认证与管理
- 登录与注册:用户可以通过注册创建账户并登录系统。
- 个人中心:支持修改密码和管理个人信息。
2. 数据总览
- 展示空气质量数据的概况,包括城市、日期、AQI 等关键信息。
3. 空气质量分析
- 年度分析:展示指定城市的 AQI 最大值、最小值、PM2.5 和 PM10 趋势。
- 月度分析:分析指定月份的 AQI 均值、排名及空气质量优秀天数。
- 气体分析:研究 CO 和 O3 的分布情况。
- 城市分布:通过地图展示各城市的 AQI 数据。
4. 词云图
- 根据空气质量数据的关键词生成词云图,直观呈现高频信息。
5. AQI 预测
- 用户输入 PM2.5、SO2、NO2、O3 的值,系统通过机器学习模型预测 AQI 并给出空气质量等级(如“优”、“良”等)。
项目结构预览
项目的目录结构清晰,分为后端逻辑、前端模板和数据处理模块:
项目根目录
├── app.py # Flask 主文件,定义路由和核心逻辑
├── utils # 工具模块
│ ├── db.py # MySQL 数据库操作
│ ├── public_data_hive.py # Hive 数据获取
│ ├── index_data.py # 首页数据处理
│ ├── air_year_data.py # 年度分析数据
│ ├── air_month_data.py # 月度分析数据
│ ├── air_gas_ana.py # 气体分析数据
│ ├── air_city_ana.py # 城市分布数据
│ ├── word_cloud_data.py # 词云数据生成
│ └── predict_data.py # AQI 预测模型
├── templates # HTML 模板
│ ├── login.html # 登录页面
│ ├── register.html # 注册页面
│ ├── index.html # 首页
│ ├── profile.html # 个人中心
│ ├── data_preview.html # 数据总览
│ ├── air_year_ana.html # 年度分析
│ ├── air_month_ana.html # 月度分析
│ ├── air_gas_ana.html # 气体分析
│ ├── air_city_ana.html # 城市分布
│ ├── air_quality_cloud.html # 词云图
│ └── air_quality_predict.html # AQI 预测
├── static # 静态资源
│ ├── css # 样式文件
│ ├── js # JavaScript 文件
│ └── img # 图片资源
└── data # 数据文件└── data.csv # 空气质量数据
关键代码分享
1. Flask 主文件(app.py)
app.py
是系统的核心,定义了路由和页面逻辑。以下是登录功能的实现:
from flask import Flask, render_template, request, redirect, url_for, session, jsonify
from utils import db, public_data_hive, index_dataapp = Flask(__name__)
app.secret_key = "sdkfjlqjluio23u429037907!@#!@#!@@"@app.route('/login', methods=['GET', 'POST'])
def login():if request.method == 'GET':return render_template('login.html')else:return_dict = {'code': '200', 'msg': '处理成功', 'result': False}cnt = db.query("select count(1) from air_aqi_db.tbl_user where user_name = %s and password = %s",[request.form['userName'], request.form['password']], 'select')if cnt[0][0]:session['userName'] = request.form['userName']return jsonify(return_dict)else:return_dict['code'] = '400'return_dict['msg'] = '用户名和密码不一致'return jsonify(return_dict)
2. Spark 数据加载脚本
Spark 用于从 CSV 文件加载数据并存入 Hive 表:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType, IntegerType, StringType, FloatTypespark = SparkSession.builder.master('local[*]').appName('sparkSQL') \.config('spark.sql.shuffle.partitions', 2) \.config('spark.sql.warehouse.dir', 'hdfs://hadoop101:8020/user/hive/warehouse') \.enableHiveSupport() \.getOrCreate()schema = StructType() \.add('city', StringType(), nullable=True) \.add('date', StringType(), nullable=True) \.add('airQuality', StringType(), nullable=True) \.add('aqi', IntegerType(), nullable=True) \.add('pm', IntegerType(), nullable=True) \.add('pm10', IntegerType(), nullable=True) \.add('so2', IntegerType(), nullable=True) \.add('no2', IntegerType(), nullable=True) \.add('co', FloatType(), nullable=True) \.add('o3', IntegerType(), nullable=True)df = spark.read.format('csv') \.option("header", True) \.option('sep', ',') \.option('encoding', 'utf-8') \.schema(schema) \.load("file:///opt/workspace/python_workspace/基于spark的空气质量数据分析可视化系统/data/data.csv")df = df.withColumn("id", monotonically_increasing_id()).dropDuplicates().na.drop()
df.write.mode('overwrite').format('hive').saveAsTable('air_aqi_db.air_data', 'parquet')spark.stop()
实现亮点
- 高效数据处理:Spark 的并行计算能力确保了对 4000+ 数据的高效处理。
- 灵活存储:Hive 支持大规模数据存储和快速查询。
- 交互式界面:Flask 结合 ECharts 和 Bootstrap,提供了美观且响应式的用户体验。
- 预测功能:机器学习模型为用户提供了实用的 AQI 预测工具。
总结
通过这个项目,我将大数据技术(Spark、Hadoop)和 Web 开发(Flask、ECharts)结合在一起,打造了一个功能丰富、用户友好的空气质量分析与预测系统。无论是从数据处理到可视化,还是从用户交互到预测功能,这个系统都展示了现代技术的强大潜力。
如果你对这个项目感兴趣,欢迎留言交流!私信分享源码。