当前位置: 首页 > backend >正文

在 PySpark 中解锁窗口函数的力量,实现高级数据转换

本篇文章Mastering PySpark Window Functions: A Practical Guide to Time-Based Analytics适合数据分析和工程师入门了解PySpark的窗口函数。文章的亮点在于详细介绍了窗口函数的基本概念及其在销售数据分析中的实际应用,帮助读者理解如何进行复杂的数据计算而无需多次连接或聚合。


文章目录

  • 1 理解窗口函数:基础
  • 2 搭建分析管道
  • 3 客户级别聚合:理解历史模式
  • 4 滚动窗口:捕捉时间趋势
  • 5 关键概念解释
  • 6 月度滞后特征:季节性模式分析
  • 7 性能优化技巧
  • 8 行业级别基准
  • 9 结论


PySpark logo image

窗口函数是 Apache Spark 中最强大但却未被充分利用的功能之一。它们允许您对与当前行相关的行执行复杂的计算,而无需昂贵的连接或多次聚合。在这篇文章中,我们将通过一个销售分析场景来探讨窗口函数的实际应用。

1 理解窗口函数:基础

可以将窗口函数视为一种在处理每个单独行时“窥视”相邻行的方式。与将多行合并为一行的常规聚合不同,窗口函数会为每个输入行返回一个结果,同时考虑一个相关行的“窗口”。

窗口函数的基本组成包括:

  • Partition By(按分区):将行分组到逻辑分区中
  • Order By(按排序):定义每个分区内的排序
  • Frame(框架):指定分区内要包含在计算中的行

2 搭建分析管道

让我们从一个包含交易记录的销售数据集开始。我们将使用各种窗口函数技术来构建预测支付延迟的特征。

from pyspark.sql import functions as F
from pyspark.sql.window import WindowsalesDF = salesDF.withColumn('transaction_day', F.dayofmonth(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_month', F.month(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_year', F.year(F.col('transaction_date')))
salesDF = salesDF.withColumn('day_of_week', F.dayofweek(F.col('transaction_date')) - 1)
salesDF = salesDF.withColumn('payment_day', F.dayofmonth(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_month', F.month(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_year', F.year(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_day_of_week', F.dayofweek(F.col('payment_due_date')) - 1)

3 客户级别聚合:理解历史模式

在深入了解窗口函数之前,我们通常需要客户级别的统计数据。这些数据为理解当前行为是典型还是异常提供了背景。

salesDF = salesDF.join(salesDF.groupBy('client_id', 'transaction_type').agg(F.mean('delay_days').alias('client_delay_average'),F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('client_delay_weighted_avg'),F.stddev('delay_days').alias('client_delay_stddev'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('client_delay_weighted_stddev'),F.expr('percentile_approx(delay_days, 0.5)').alias('client_delay_median'),F.count('delay_days').alias('client_transaction_count')),on=['client_id', 'transaction_type'],how='left'
)

加权标准差的计算可能看起来很复杂,但它使用的是数学公式:E[X2]−(E[X])2\sqrt{E[X^2] - (E[X])^2}E[X2](E[X])2,其中较大的交易对标准差计算的影响更大。

4 滚动窗口:捕捉时间趋势

这就是窗口函数真正发挥作用的地方。滚动窗口允许我们计算滑动时间段内的指标,捕捉客户行为中的趋势和季节性。

time_windows = [30, 90, 365]for days in time_windows:rolling_window = (Window.partitionBy('client_id', 'transaction_type').orderBy(F.col('transaction_date').cast("timestamp").cast("long")).rangeBetween(-days * 86400, -1))salesDF = salesDF.withColumn(f'delay_rolling_avg_{days}d',F.avg('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_std_{days}d',F.stddev('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_weighted_avg_{days}d',F.try_divide(F.sum(F.col('delay_days') * F.col('invoice_amount')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)))salesDF = salesDF.withColumn(f'delay_rolling_weighted_std_{days}d',F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)), 2)))

5 关键概念解释

Range(范围)与 Rows(行)窗口:我们使用 rangeBetween(-days * 86400, -1) 而不是 rowsBetween(),因为我们想要一个基于时间的窗口。这确保我们能够精确地捕获指定天数的数据,而与交易频率无关。

加权计算:通过按发票金额对指标进行加权,我们赋予了较大交易更高的重要性,这通常能更好地代表客户的支付行为。

排除当前行:将 -1 作为上限可以排除当前交易,从而防止预测模型中的数据泄露。

6 月度滞后特征:季节性模式分析

为了进行长期趋势分析,我们可以创建月度聚合并生成滞后特征以捕捉季节性模式。

monthlyDF = salesDF.groupBy("client_id", "transaction_type", "transaction_year", "transaction_month"
).agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('monthly_weighted_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('monthly_weighted_delay_std')
)monthly_window = Window.partitionBy("client_id", "transaction_type") \.orderBy("transaction_year", "transaction_month")for lag_months in range(1, 13):monthlyDF = monthlyDF.withColumn(f"delay_avg_lag_{lag_months}m",F.lag("monthly_weighted_delay_avg", lag_months).over(monthly_window))monthlyDF = monthlyDF.withColumn(f"delay_std_lag_{lag_months}m",F.lag("monthly_weighted_delay_std", lag_months).over(monthly_window))salesDF = salesDF.join(monthlyDF,on=["client_id", "transaction_type", "transaction_year", "transaction_month"],how="left"
)

7 性能优化技巧

分区策略:始终根据逻辑上对数据进行分组的高基数列进行分区。这可以最大限度地减少数据混洗。

窗口框架优化:使用尽可能限制性的框架。无界窗口开销大且通常不必要。

缓存:当对同一数据集执行多个窗口操作时,考虑缓存中间结果。

salesDF.cache()

8 行业级别基准

不要忘记创建行业或细分市场级别的基准进行比较:

salesDF = salesDF.join(salesDF.groupBy('industry_sector', "transaction_type").agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('industry_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('industry_delay_std')),on=['industry_sector', 'transaction_type'],how='left'
)

9 结论

窗口函数解锁了 PySpark 中复杂的分析能力,使您能够为机器学习和高级分析创建丰富的特征集。关键在于理解何时使用不同类型的窗口:

  • 无界窗口:用于累积指标
  • 基于范围的窗口:用于时间序列分析
  • 基于行的窗口:用于排名和百分位数
  • 滞后函数:用于趋势和季节性检测

通过将这些技术与适当的分区和优化策略相结合,您可以构建健壮、可扩展的分析管道,捕捉数据中复杂的时间模式。

开始在您自己的数据集中尝试这些模式,您很快就会发现窗口函数在将原始数据转化为可操作洞察方面的真正力量。

http://www.xdnf.cn/news/19682.html

相关文章:

  • 数控机床相邻轨迹最大过渡速度计算方法介绍
  • 【Kubernetes】知识点2
  • 【数学建模学习笔记】时间序列分析:LSTM
  • Vue 3 + TypeScript 现代前端开发最佳实践(2025版指南)
  • 【完整源码+数据集+部署教程】PHC桩实例分割系统源码和数据集:改进yolo11-Faster-EMA
  • 黄金金融期货数据API对接技术文档
  • nmap扫描端口,netstat
  • 土地退化相关
  • Axure: 平滑折线图
  • Apache Doris:重塑湖仓一体架构的高效计算引擎
  • 文件页的预取逻辑
  • 小兔鲜儿项目
  • 树莓派网页监控
  • 从 Arm Compiler 5 迁移到 Arm Compiler 6
  • 2025 随身 WIFI 行业报告:从拼参数到重体验,华为 / 格行 / 中兴技术差异化路径解析
  • 梳理一下 @types/xxx
  • java面试中经常会问到的多线程问题有哪些(基础版)
  • think
  • ubuntu系统设置中文失败问题
  • grpc-swift-2 学习笔记
  • 均匀分布直线阵的常规波束形成方位谱和波束图
  • (Arxiv-2025)ConceptMaster:基于扩散 Transformer 模型的多概念视频定制,无需测试时微调
  • 【2025终极对决】Python三大后端框架Django vs FastAPI vs Robyn,你的选择将决定项目生死?
  • [光学原理与应用-366]:ZEMAX - 用成像原理说明人眼为什么能看清物体?
  • 两款超实用办公插件推荐:Excel聚光灯与Word公文排版
  • MySQL 多表查询方法
  • Spring Boot 全局字段处理最佳实践
  • mysql初学者练习题(从基础到进阶,相关数据sql脚本在最后)
  • 59.螺旋矩阵II
  • 框架-SpringMVC-1