当前位置: 首页 > ops >正文

生产环境Spark Structured Streaming实时数据处理应用实践分享

封面图片

生产环境Spark Structured Streaming实时数据处理应用实践分享

一、业务场景描述

我们所在的电商平台需要实时监控用户行为数据(如点击、下单、支付等),基于事件级别的流式数据进行实时统计、会话聚合、漏斗分析,并将结果推送到Dashboard和报表存储。原有系统使用的Storm+Kafka方案在高并发时存在容错难、状态管理复杂、维护成本高的问题。

核心需求:

  • 低延迟:端到端处理延迟控制在2秒以内。
  • 可伸缩:能水平扩展,应对峰值10万条/秒消息吞吐。
  • 容错性:任务失败自动重启且保证端到端数据不丢失。
  • 状态管理:支持有状态聚合(窗口、会话)和超大状态存储。

二、技术选型过程

我们对主流实时计算框架进行了对比:

| 框架 | 延迟 | 状态管理 | 易用性 | 扩展性 | 社区成熟度 | | ---- | ---- | ---- | ---- | ---- | ---- | | Apache Storm | 500ms~1s | 需自行实现State Store | 开发复杂 | 高 | 高 | | Apache Flink | 200ms~500ms | 内置强大状态管理 | 编程模型复杂 | 高 | 高 | | Spark Structured Streaming | 1s~2s | 使用Checkpoint and WAL,可容错 | API友好,基于Spark SQL | 高 | 高 | | Apache Kafka Streams | <1s | 基于RocksDB,状态管理受限 | 与Kafka耦合高 | 中 | 中 |

综合考虑团队技术栈和运维成本,我们最终选定Spark Structured Streaming:

  • 与现有Spark Batch集群共用资源。
  • 编程模型统一,SQL/DS/Lambda API支持灵活。
  • Checkpoint与WAL机制简化状态管理,集成HDFS持久化状态。

三、实现方案详解

3.1 项目结构

├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com.company.streaming
│   │   │       ├── App.java
│   │   │       └── utils
│   │   │           └── KafkaOffsetManager.java
│   │   └── resources
│   │       └── application.conf
└── README.md

3.2 核心配置(application.conf)

spark.app.name=RealTimeUserBehavior
spark.master=yarn
spark.sql.shuffle.partitions=200
spark.streaming.backpressure.enabled=true
spark.checkpoint.dir=hdfs://namenode:8020/app/checkpoints/structured-streaming
kafka.bootstrap.servers=broker1:9092,broker2:9092
kafka.topic.user=topic_user_behavior
kafka.group.id=user_behavior_group

3.3 主入口代码(App.java)

package com.company.streaming;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;public class App {public static void main(String[] args) throws Exception {SparkSession spark = SparkSession.builder().appName("RealTimeUserBehavior").getOrCreate();// 从Kafka读取原始数据Dataset<Row> raw = spark.readStream().format("kafka").option("kafka.bootstrap.servers", spark.sparkContext().getConf().get("kafka.bootstrap.servers")).option("subscribe", spark.sparkContext().getConf().get("kafka.topic.user")).option("startingOffsets", "latest").load();// 解析JSON并选取字段Dataset<Row> userEvents = raw.selectExpr("CAST(value AS STRING) as json").select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("json"),DataSchema.eventSchema()).as("data")).select("data.*");// 实时会话聚合:10分钟无操作认为会话结束Dataset<Row> sessions = userEvents.withWatermark("eventTime", "2 minutes").groupBy(org.apache.spark.sql.functions.window(org.apache.spark.sql.functions.col("eventTime"),"10 minutes", "5 minutes"),org.apache.spark.sql.functions.col("userId")).agg(org.apache.spark.sql.functions.count("eventType").alias("eventCount"),org.apache.spark.sql.functions.min("eventTime").alias("startTime"),org.apache.spark.sql.functions.max("eventTime").alias("endTime"));// 输出到HDFS OR 更新到外部系统sessions.writeStream().outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("30 seconds")).option("path", "hdfs://namenode:8020/app/output/user_sessions").option("checkpointLocation", spark.sparkContext().getConf().get("spark.checkpoint.dir") + "/sessions").start().awaitTermination();}
}

3.4 关键工具类(KafkaOffsetManager.java)

package com.company.streaming.utils;// 省略:管理Kafka手动提交offset、读写Zookeeper存储偏移量

四、踩过的坑与解决方案

  1. 状态膨胀导致Checkpoint文件过大:

    • 方案:定期做State TTL清理,结合Spark 3.1.1+的state cleanup策略。
  2. Kafka消费位点重复或丢失:

    • 方案:使用KafkaOffsetManager手动管理,结合幂等写入目标系统保证At-Least-Once语义。
  3. 延迟抖动:

    • 方案:开启backpressure,限制最大并行度,并合理调整Trigger频率。
  4. Driver内存溢出:

    • 方案:提升driver内存,拆分业务流程;或将部分轻量计算迁移至Executors。

五、总结与最佳实践

  • 合理规划Checkpoint和WAL存储目录,避免与业务数据混淆。
  • 利用Spark监控UI实时观察批次时长、shuffle写入、延迟指标。
  • 结合PeriodicStateCleanup+Watermark确保有状态算子状态可控。
  • 抽象共通工具类(KafkaOffsetManager、JSON解析、公用Schema),提高代码可维护性。
  • 复杂业务可拆分成多个流式子作业,下游合并结果,增强可扩展性。

通过以上实践,我们成功将平台数据实时处理延迟稳定在1.2秒左右,作业稳定运行10+节点集群一个季度零故障。

http://www.xdnf.cn/news/18975.html

相关文章:

  • ZArchiver解压器:强大的安卓解压缩工具
  • 数据结构 第三轮
  • 使用 Dify 和 LangBot 搭建飞书通信机器人
  • Elasticsearch AI 语义搜索(semantic_text)
  • 群晖Nas上使用工具rsync工具usb同步数据
  • 国际期货Level2分时Tick历史行情数据处理分析
  • Vue2+Element 初学
  • 如何备份 TECNO 手机上的短信
  • position属性
  • rabbitmq学习笔记 ----- 多级消息延迟始终为 20s 问题排查
  • 2025最新uni-app横屏适配方案:微信小程序全平台兼容实战
  • Java开发MongoDB常见面试题及答案
  • DQL单表查询相关函数
  • 【WPF】WPF 自定义控件实战:从零打造一个可复用的 StatusIconTextButton (含避坑指南)
  • 安卓开发---BaseAdapter(定制ListView的界面)
  • 中文PDF解析工具测评与选型指南
  • js AbortController 实现中断接口请求
  • 【面试场景题】三阶段事务提交比两阶段事务提交的优势是什么
  • 《C++进阶之STL》【AVL树】
  • 基于 GPT-OSS 的成人自考口语评测 API 开发全记录
  • 数据分析编程第七步:分析与预测
  • Qt节点编辑器设计与实现:动态编辑与任务流可视化(一)
  • 【拍摄学习记录】07-影调、直方图量化、向右向左
  • 经典扫雷游戏实现:从零构建HTML5扫雷游戏
  • 【Python】Python 实现 PNG 转 ICO 图标转换工具
  • LightGBM 在金融逾期天数预测任务中的经验总结
  • Qt自定义聊天消息控件ChatMessage:初步实现仿微信聊天界面
  • Linux之Shell编程(一)
  • Linux笔记12——shell编程基础-6
  • Swift 解法详解 LeetCode 365:水壶问题