当前位置: 首页 > java >正文

Flink 滑动窗口实战:从 KeyedProcessFunction 到 AggregateFunction WindowFunction 的完整旅程

一、业务背景

我们要在 Flink 实时流上统计 每个用户-品牌组合最近 1 小时的最晚行为时间,并且每 5 分钟更新一次结果。
数据来自 Kafka,事件类型为 CartEvent

public class CartEvent {public String userId;public String brandId;public long   ts;      // 事件时间戳
}

二、三种实现模式对比速查表

方案适用场景优点缺点
KeyedProcessFunction需要完全自己管理定时器、状态最灵活,可定制任何逻辑代码量大,易出错
AggregateFunction + WindowFunction(老 API)预聚合后再做最终加工性能好,API 简单只能拿到预聚合结果
Reduce/Aggregate + ProcessWindowFunction(新 API)预聚合后还能拿到窗口元数据兼具性能与信息相对复杂

三、KeyedProcessFunction 版本(最底层)

3.1 核心思想

  • MapState<String, Long> 保存每个用户-品牌的最晚时间。
  • 注册事件时间定时器,1 小时后触发输出。
  • 每来一条数据就更新状态并重新注册定时器(滑动要靠我们自己做)。

3.2 代码骨架

public class MaxTimeProcessFuncextends KeyedProcessFunction<String, CartEvent, Tuple3<String,String,Long>> {// 状态:userId_brandId -> maxTsprivate MapState<String, Long> maxTsState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Long> desc =new MapStateDescriptor<>("maxTs", String.class, Long.class);maxTsState = getRuntimeContext().getMapState(desc);}@Overridepublic void processElement(CartEvent value,Context ctx,Collector<Tuple3<String,String,Long>> out) throws Exception {String key = value.userId + "\001" + value.brandId;Long old = maxTsState.get(key);if (old == null || value.ts > old) {maxTsState.put(key, value.ts);}// 每 5 min 滑动 -> 注册“窗口结束时间”定时器long windowEnd = ctx.timestamp() - ctx.timestamp() % 300_000L + 300_000L;ctx.timerService().registerEventTimeTimer(windowEnd);}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple3<String,String,Long>> out) throws Exception {for (Map.Entry<String, Long> e : maxTsState.entries()) {String[] arr = e.getKey().split("\001");out.collect(Tuple3.of(arr[0], arr[1], e.getValue()));}// 可选:清理 1h 之前的状态}
}

缺点:得自己算窗口边界、清理过期状态,容易踩坑。


四、滑动窗口 + AggregateFunction(推荐)

4.1 需求映射

  • 窗口长度 = 1 小时
  • 滑动间隔 = 5 分钟

4.2 AggregateFunction 签名解析

public class MaxTimeAggimplements AggregateFunction<CartEvent, Long, Long> {@Overridepublic Long createAccumulator() {return Long.MIN_VALUE;         // 初始极小值}@Overridepublic Long add(CartEvent value, Long acc) {return Math.max(acc, value.ts); // 每条数据进来取最大}@Overridepublic Long getResult(Long acc) {return acc;                    // 窗口触发时返回最大时间}@Overridepublic Long merge(Long acc1, Long acc2) {return Math.max(acc1, acc2);   // 用于会话窗口或多线程合并}
}
泛型含义
CartEventIN:输入事件
LongACC:累加器,只保存当前最大值
LongOUT:最终输出

注意:AggregateFunction 只能拿到 窗口内聚合后的结果拿不到窗口元数据(起止时间、key 等),所以需要再配合 WindowFunction。


五、AggregateFunction + WindowFunction 组合

5.1 主流程

DataStream<CartEvent> source = env.addSource(...);source.keyBy(e -> e.userId + "\001" + e.brandId).window(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(5))).aggregate(new MaxTimeAgg(), new WindowEndFunc()).print();

5.2 WindowFunction 把窗口信息补回来

public class WindowEndFuncimplements WindowFunction<Long, WindowResult, String, TimeWindow> {@Overridepublic void apply(String key,TimeWindow window,Iterable<Long> maxTsIt,Collector<WindowResult> out) {Long maxTs = maxTsIt.iterator().next();  // 只有一条String[] arr = key.split("\001");out.collect(new WindowResult(arr[0], arr[1], maxTs,"brand", "T"));}
}

5.3 WindowResult POJO

public class WindowResult {public String userId;public String value;   // brandIdpublic long   time;    // 最晚行为时间public String dimType; // 维度类型public String list;    // 附加字段public WindowResult() {}public WindowResult(String userId, String value,long time, String dimType, String list) {this.userId  = userId;this.value   = value;this.time    = time;this.dimType = dimType;this.list    = list;}@Overridepublic String toString() {return "WindowResult{" +"userId='" + userId + '\'' +", value='" + value + '\'' +", time=" + time +", dimType='" + dimType + '\'' +", list='" + list + '\'' +'}';}
}

六、时间与输出节奏

现象解释
作业启动后 1 小时内 看不到任何结果必须等第一条窗口完整闭合(1 小时)。
之后 每 5 分钟 都会输出一次滑动窗口每滑动一格(5 min)触发一次。

七、调优 & 踩坑小结

  1. 状态 TTL
    滑动窗口会产生大量过期窗口,建议设置 StateTtlConfig 清理 1.5 小时前的状态。

  2. 并行度
    如果 key 量大,可以把 userId_brandId 做二次分区,避免热点 key。

  3. 迟到数据
    允许迟到 1 分钟:allowedLateness(Time.minutes(1))

  4. 空闲数据源
    使用 WatermarkStrategy.withIdleness() 防止空闲分区不触发窗口。


八、一句话总结

  • 想最省事 —— 直接 aggregate(new MaxTimeAgg(), new WindowEndFunc())
  • 想最灵活 —— 用 KeyedProcessFunction 自己管定时器和状态
  • 想兼顾性能与信息 —— 用 AggregateFunction + ProcessWindowFunction(新 API)

至此,我们已经把 Flink 滑动窗口的三种写法、时间语义、状态管理和调优思路全部串了一遍。祝实战愉快!

http://www.xdnf.cn/news/18793.html

相关文章:

  • 交换机是如何同时完成帧统计与 BER/FEC 分析的
  • leetcode LCR 012.寻找数组的中心下标
  • 59 C++ 现代C++编程艺术8-智能指针
  • IO多路转接(select方案)
  • 测试用例如何评审?
  • `mysql_query()` 数据库查询函数
  • 如何监控ElasticSearch的集群状态?
  • THM trypwnme2
  • 【广告系列】流量归因模型
  • LeetCode热题100--102. 二叉树的层序遍历--中等
  • 云计算学习笔记——Linux用户和组的归属权限管理、附加权限、ACL策略管理篇
  • CentOS安装Jenkins全流程指南
  • 【大白话解析】 OpenZeppelin 的 ECDSA 库:以太坊签名验证安全工具箱(附源代码)
  • 零基础也能写博客:cpolar简化Docsify远程发布流程
  • 自学嵌入式第二十七天:Linux系统编程-进程
  • MQTT 协议模型:客户端、 broker 与主题详解(二)
  • Java 学习笔记(基础篇10)
  • Qwen2-Plus与DeepSeek-V3深度测评:从API成本到场景适配的全面解析
  • Coze用户账号设置修改用户头像-后端源码
  • 大模型的多机多卡训练
  • 09-数据存储与服务开发
  • 深度学习分类网络初篇
  • react+taro打包到不同小程序
  • Nginx与Apache:Web服务器性能大比拼
  • Docker:技巧汇总
  • 连锁零售排班难?自动排班系统来解决
  • Swiper属性全解析:快速掌握滑块视图核心配置!(2.3补充细节,详细文档在uniapp官网)
  • 从C语言到数据结构:保姆级顺序表解析
  • 数据库之两段锁协议相关理论及应用
  • 前端开发:详细介绍npm、pnpm和cnpm分别是什么,使用方法以及之间有哪些关系