当前位置: 首页 > ai >正文

Flink Stream API 源码走读 - window 和 sum

本文核心观点

核心观点:WindowedStream 是一个"假流",它比 KeyedStream 更虚,只是一个 API 的过渡器,不是真正意义上的 DataStream,需要调用函数回归。

  1. 虚拟化时刻:从真实流到虚拟流
    KeyedStream<T,K> keyedStream = …; // 半虚拟流
    WindowedStream<T,K,W> windowedStream = keyedStream.window(assigner); // 完全虚拟流

  2. 回归时刻:从虚拟流回到真实流
    windowedStream.sum()
    return input.transform(opName, resultType, operator); // 回到DataStream标准流程

一、window() 方法的特殊性发现

1.1 只有 KeyedStream 才有 window 方法

//  DataStream 上没有 window 方法
DataStream<String> stream = ...;
// stream.window(assigner); // 编译错误!//  只有 KeyedStream 才有 window 方法
KeyedStream<String, String> keyedStream = stream.keyBy(...);
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(assigner);

为什么这样设计?

  • 窗口操作需要基于 Key 进行分组
  • 每个 Key 都有独立的窗口状态
  • 保证相同 Key 的数据进入同一个窗口实例

1.2 KeyedStream 的特殊 API 设计

public class KeyedStream<T, KEY> extends DataStream<T> {// 继承 DataStream 的所有方法:map, filter, flatMap...// KeyedStream 特有的窗口 APIpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner);public WindowedStream<T, KEY, GlobalWindow> countWindow(long size);// KeyedStream 特有的聚合 APIpublic SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function);public SingleOutputStreamOperator<T> sum(int positionToSum);public SingleOutputStreamOperator<T> max(int positionToMax);// ... 其他聚合操作
}

设计理念

  • 继承性:保留 DataStream 的所有基础能力
  • 扩展性:增加基于 Key 的特殊操作
  • 状态性:支持有状态的聚合操作

二、WindowedStream 的"虚拟"本质

2.1 WindowedStream 的创建过程

// KeyedStream.java
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner);  // 仅仅是创建对象
}

关键发现:window() 方法没有创建任何 Transformation!

2.2 WindowedStream 的内部结构

public class WindowedStream<T, K, W extends Window> {// 仅有两个成员变量private final KeyedStream<T, K> input;           // 上游流的引用private final WindowOperatorBuilder<T, K, W> builder;  // 算子构建器// 注意:没有继承 DataStream!
}

2.3 WindowedStream 构造函数解析

public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;  // 保存上游流引用// 创建窗口算子构建器,用于构建窗口操作的核心组件// WindowOperatorBuilder是构建者模式的实现,负责组装窗口操作所需的各种组件this.builder = new WindowOperatorBuilder<>(// 窗口分配器:决定数据元素被分配到哪个窗口// 例如:TumblingEventTimeWindows、SlidingEventTimeWindows等windowAssigner,// 窗口触发器:决定何时触发窗口计算和输出结果// 每种窗口分配器都有其默认的触发器策略// 例如:EventTimeTrigger用于事件时间窗口,ProcessingTimeTrigger用于处理时间窗口windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),// 执行配置:包含序列化器、并行度等运行时配置信息input.getExecutionConfig(),// 输入数据类型信息:用于序列化和反序列化输入数据input.getType(),// Key选择器:从输入数据中提取分组键,确保相同key的数据进入同一个窗口实例input.getKeySelector(),// Key类型信息:用于序列化和反序列化分组键input.getKeyType());
}

重要理解

  • 构造函数只是组装配置信息,没有创建算子
  • 比 KeyedStream 更"虚",KeyedStream 好歹有个 PartitionTransformation
  • WindowedStream 什么 Transformation 都没有

2.4 WindowedStream 的"虚拟"特性

流类型虚拟化程度特性描述
DataStream🟢 真实流✅ 有 Transformation
✅ 支持链式调用
✅ 可直接执行
KeyedStream🟡 半虚拟流✅ 有 PartitionTransformation
✅ 支持链式调用
✅ 支持窗口API
⚠️ 无实际算子
WindowedStream🔴 完全虚拟流❌ 无 Transformation
❌ 断开链式调用
✅ 只支持窗口聚合API
⚠️ 纯过渡器

WindowedStream 的特殊性

  1. 不继承 DataStream - 彻底断开链式调用
  2. 纯 API 过渡器 - 只是工具类,不是真正的流
  3. 强制聚合 - 必须调用聚合操作才能回到正常流
  4. 临时状态 - 无法直接使用,必须转换

WindowedStream 的特殊性

  1. 不继承 DataStream - 彻底断开链式调用
  2. 纯 API 过渡器 - 只是工具类,不是真正的流
  3. 强制聚合 - 必须调用聚合操作才能回到正常流
  4. 临时状态 - 无法直接使用,必须转换

三、sum() 方法的完整解析

3.1 sum() 方法的调用链

// WindowedStream.java - 入口方法
public SingleOutputStreamOperator<T> sum(int positionToSum) {// 创建内置的求和聚合器return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}// aggregate 方法 - 中转
private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {return reduce(aggregator);  // 转发给 reduce
}

关键理解

  • sum() 只是一个便利方法
  • 内部使用 Flink 预定义的 SumAggregator
  • 最终还是调用 reduce() 方法

3.2 SumAggregator 的本质

// SumAggregator 的继承关系
public class SumAggregator<T> extends AggregationFunction<T> implements ReduceFunction<T> {private final int positionToSum;  // 要求和的字段位置// 实现具体的求和逻辑
}

重要发现

  • SumAggregator 就是一个 ReduceFunction
  • 与用户自定义的 MapFunction 地位完全相同
  • Flink 内部预写好的函数,用户也可以自己实现

3.3 reduce() 方法的三层重载

// 第一层:只有 ReduceFunction(我们的入口)
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {function = input.getExecutionEnvironment().clean(function);  // 清理函数return reduce(function, new PassThroughWindowFunction<>());  // 添加默认 WindowFunction
}// 第二层:ReduceFunction + WindowFunction
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function) {// 推断输出类型TypeInformation<R> resultType = getWindowFunctionReturnType(function, inputType);return reduce(reduceFunction, function, resultType);  // 继续传递
}// 第三层:完整参数(最终实现)
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function,TypeInformation<R> resultType) {// 1. 清理函数(序列化检查)function = input.getExecutionEnvironment().clean(function);reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);// 2. 生成算子名称和描述final String opName = builder.generateOperatorName();final String opDescription = builder.generateOperatorDescription(reduceFunction, function);// 3. 通过 builder 根据function 创建WindowOperatorOneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);// 4. 根据Operator 创建 OperatorFactory -> transformation -> DataStreamreturn input.transform(opName, resultType, operator).setDescription(opDescription);
}

重载链的设计目的

  • 逐步补充参数:从简单到复杂
  • 提供默认值:PassThroughWindowFunction 作为默认窗口函数
  • 类型推断:自动推断输出类型
  • 函数清理:确保函数可序列化

3.4 PassThroughWindowFunction 的巧妙设计

// 第一层 reduce 方法中的关键一行
return reduce(function, new PassThroughWindowFunction<>());

PassThroughWindowFunction 的作用

// PassThroughWindowFunction 的简化实现
public class PassThroughWindowFunction<T, K, W extends Window>implements WindowFunction<T, T, K, W> {@Overridepublic void apply(K key, W window, Iterable<T> input, Collector<T> out) {// 直接透传,不做任何处理for (T element : input) {out.collect(element);}}
}

为什么需要 PassThroughWindowFunction?

  • 接口统一:WindowOperator 需要 ReduceFunction + WindowFunction 两个函数
  • 透明传递:用户只想要聚合结果,不需要额外处理
  • 适配器模式:将单一的 ReduceFunction 适配为完整的窗口处理流程
用户调用sum
只有ReduceFunction
SumAggregator
自动添加
PassThroughWindowFunction
WindowOperator需要的
完整函数对

五、回到 DataStream 的标准流程

5.1 关键的回归时刻

// WindowedStream 的最后一步 - 回到正轨!
return input.transform(opName, resultType, operator);

这一行代码的重要性

  • inputKeyedStream(继承自 DataStream
  • 调用的是 DataStream.transform() 方法
  • WindowedStream 完成使命,回到标准流程

5.2 transform() 方法的标准处理

// DataStream.java - 标准的 transform 方法
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 包装算子为工厂return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

5.3 doTransform() 的核心逻辑

protected <R> SingleOutputStreamOperator<R> doTransform(...) {// 1. 创建物理 TransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,    // 上游:PartitionTransformation (keyBy产生的)operatorName,          // "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, SumAggregator, PassThroughWindowFunction)"operatorFactory,       // SimpleOperatorFactory(WindowOperator)outTypeInfo,          // 输出类型信息environment.getParallelism(),  // 并行度false);               // 不是并行度敏感的// 2. 创建新的 DataStreamSingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator<>(environment, resultTransform);// 3. 添加到执行环境 - 重要!getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

关键步骤解析

  1. 创建物理 Transformation:包含真正的算子
  2. 构建新的 DataStream:恢复正常的流
  3. 注册到环境:只有物理 Transformation 才会被注册

六、调用时序图

在这里插入图片描述

导航链接

上节链接:Flink Stream API 源码走读 - keyBy

下节预告:Flink Stream API 源码走读 - print

http://www.xdnf.cn/news/17911.html

相关文章:

  • 前端开发入门书籍推荐:Vue.js 3与前端基础的完美组合
  • 九尾狐未来机械锂晶核
  • 数据结构:满二叉树 (Full Binary Tree) 和 完全二叉树 (Complete Binary Tree)
  • Three.js三大组件:场景(Scene)、相机(Camera)、渲染器(Renderer)
  • tree组件(几种不同分叉树Vue3)
  • 免费万能电子书格式转换器!Neat Converter支持 ePub、Azw3、Mobi、Doc、PDF、TXT 文件的相互转换。
  • 【图像算法 - 15】智能行李识别新高度:基于YOLO12实例分割与OpenCV的精准检测(附完整代码)
  • React手撕组件和Hooks总结
  • springboot项目单独对数据源配置加解密
  • 编程基础之字符串——过滤多余的空格
  • B3844 [GESP样题 二级] 画正方形
  • CPP多线程2:多线程竞争与死锁问题
  • 复合机器人食品分拣生产线:一体化控制系统引领高效柔性新食代
  • 硬核北京 | 2025世界机器人大会“破圈”,工业智能、康养科技…… 亦庄上演“机器人总动员”
  • Java 多线程教程
  • 心路历程-三个了解敲开linux的大门
  • 第三十七天(js前端数据加密和混淆)
  • 设计模式之静态代理
  • 拒绝造轮子(C#篇)使用SqlSugar实现数据库的访问
  • KingbaseES高可用架构深度解析——从读写分离到异地灾备的全方位守护
  • Vue2.x核心技术与实战(一)
  • Flutter InheritedWidget 详解:从生命周期到数据流动的完整解析
  • 《探索IndexedDB实现浏览器端UTXO模型的前沿技术》
  • Blackwell 和 Hopper 架构的 GPGPU 新功能全面综述
  • debian 13 显示中文字体 不再显示菱形块 终端显示中文
  • 【121页PPT】锂膜产业MESERP方案规划建议(附下载方式)
  • week1-[循环嵌套]画正方形
  • hex文件结构速查
  • Java研学-SpringCloud(三)
  • LCR 076. 数组中的第 K 个最大元素