当前位置: 首页 > ops >正文

Flink Stream API 源码走读 - map 和 flatMap

概述

本文深入分析了 Flink 中 map()flatMap() 方法的源码实现,展示了从 Function 到 Operator 再到 Transformation 的完整转换流程。

前置知识回顾

DataStream 的核心结构

public class DataStream<T> {protected final StreamExecutionEnvironment environment;  // 执行环境protected final Transformation<T> transformation;        // 转换操作
}

重要理解:

  • 每个 DataStream 都包含一个 Transformation
  • Transformation 持有上一个 Transformation 的引用,形成链条
  • 执行环境 environment 在整个调用链中保持不变

map() 方法源码分析

1. map 方法的重载链

// 用户调用入口
dataStream.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) throws Exception {return value.length();}
})
map(MapFunction<T,R> mapper)
抽取返回类型
TypeExtractor.getMapReturnTypes()
map(mapper, outType)
transform('Map', outType, new StreamMap(mapper))

2. 类型信息抽取

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {// 1. 抽取输出类型信息TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true);// 2. 调用重载方法return map(mapper, outType);
}

关键点:

  • clean(mapper) - 对用户函数进行序列化检查
  • TypeExtractor.getMapReturnTypes() - 抽取 MapFunction 的返回类型
  • 解决 Java 泛型擦除问题

3. Function → Operator 转换

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将 MapFunction 封装成 StreamMap Operatorreturn transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

转换过程:

  • MapFunction (用户逻辑) → StreamMap (算子)
  • StreamMap 继承自 AbstractUdfStreamOperator
  • 这是第二个核心概念:Operator(算子)

4. transform 方法分析

public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {// 将 Operator 包装成 OperatorFactoryreturn doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

重要概念:

  • SimpleOperatorFactory - 简单算子工厂
  • 算子工厂是对算子的进一步包装
  • 提供 getOperator()createStreamOperator() 方法

5. doTransform 核心逻辑

protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {// 1. 创建 OneInputTransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,    // 上一个 transformationoperatorName,          // 算子名称 "Map"operatorFactory,       // 算子工厂outTypeInfo,          // 输出类型environment.getParallelism(),  // 并行度false                 // 是否并行配置);// 2. 创建新的 DataStreamSingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);// 3. 将 transformation 添加到执行环境getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

核心概念转换流程

MapFunction
(用户逻辑)
StreamMap
(算子)
SimpleOperatorFactory
(算子工厂)
OneInputTransformation
(单输入转换)
SingleOutputStreamOperator
(新的DataStream)

转换详解

  1. Function → Operator

    MapFunction<String, Integer> mapper = ...;
    StreamMap<String, Integer> operator = new StreamMap<>(mapper);
    
  2. Operator → OperatorFactory

    SimpleOperatorFactory<Integer> factory = SimpleOperatorFactory.of(operator);
    
  3. OperatorFactory → Transformation

    OneInputTransformation<String, Integer> transformation = new OneInputTransformation<>(this.transformation, "Map", factory, outType, parallelism, false);
    
  4. Transformation → DataStream

    SingleOutputStreamOperator<Integer> result = new SingleOutputStreamOperator<>(environment, transformation);
    

OneInputTransformation 详解

为什么叫 OneInput?

public class OneInputTransformation<IN, OUT> extends Transformation<OUT> {private final Transformation<IN> input;  // 只有一个输入// ...
}

命名含义:

  • OneInput - 表示只有一个输入流
  • 对应的还有 TwoInputTransformation (如 join、union 操作)
  • 体现了不同算子的输入特性

Transformation 链条

SourceTransformation
(socketTextStream)
OneInputTransformation
(map)
OneInputTransformation
(flatMap)
OneInputTransformation
(filter)
SinkTransformation
(print)

flatMap() 方法源码分析

1. flatMap 与 map 的相似性

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {// 1. 抽取输出类型TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true);// 2. 调用重载方法return flatMap(flatMapper, outType);
}public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {// 将 FlatMapFunction 封装成 StreamFlatMap Operatorreturn transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}

2. 与 map 的差异

特性mapflatMap
函数接口MapFunction<T, R>FlatMapFunction<T, R>
算子实现StreamMapStreamFlatMap
输出特性1对1映射1对多映射
算子名称“Map”“Flat Map”

3. 相同的处理流程

FlatMapFunction
StreamFlatMap
SimpleOperatorFactory
OneInputTransformation
SingleOutputStreamOperator

流程一致性:

  • 都经过相同的 doTransform 方法
  • 都创建 OneInputTransformation
  • 都返回 SingleOutputStreamOperator

执行环境的 Transformation 管理

addOperator 方法

// 在 doTransform 中调用
getExecutionEnvironment().addOperator(resultTransform);
// StreamExecutionEnvironment 中的实现
public class StreamExecutionEnvironment {private final List<Transformation<?>> transformations = new ArrayList<>();public void addOperator(Transformation<?> transformation) {transformations.add(transformation);}
}

重要作用:

  • 将每个 Transformation 添加到环境的列表中
  • 为后续生成 JobGraph 做准备
  • 形成完整的 Transformation 树

命名问题的吐槽

容易混淆的命名

  1. SingleOutputStreamOperator

    // 实际上是个 DataStream,不是 Operator!
    public class SingleOutputStreamOperator<T> extends DataStream<T>
    
  2. addOperator vs addTransformation

    // 方法名叫 addOperator,实际添加的是 Transformation
    environment.addOperator(transformation);  // 应该叫 addTransformation
    

建议:

  • 忽略这些命名问题,理解本质
  • SingleOutputStreamOperator 就是 DataStream
  • 重点关注概念转换流程

链式调用的实现

返回值分析

DataStream<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Integer> mapped = source.map(...);      // 返回 DataStream
SingleOutputStreamOperator<String> flatMapped = mapped.flatMap(...); // 返回 DataStream
KeyedStream<String, String> keyed = flatMapped.keyBy(...);          // 返回 KeyedStream

链式调用原理:

  • 每个操作都返回某种形式的 DataStream
  • SingleOutputStreamOperator 继承自 DataStream
  • 所有 API 方法都定义在 DataStream

为什么叫 SingleOutput?

public class SingleOutputStreamOperator<T> extends DataStream<T>

命名含义:

  • SingleOutput - 表示只有一个输出流
  • 区别于可能有多个输出的算子
  • 体现了算子的输出特性

总结

核心流程回顾

  1. 用户调用 dataStream.map(mapFunction)
  2. 类型抽取 通过 TypeExtractor 获取返回类型
  3. Function→OperatorMapFunction 封装成 StreamMap
  4. Operator→Factory 将算子包装成 SimpleOperatorFactory
  5. Factory→Transformation 创建 OneInputTransformation
  6. Transformation→DataStream 创建新的 SingleOutputStreamOperator
  7. 环境管理Transformation 添加到执行环境

设计模式体现

  • 装饰器模式: Function → Operator → Factory → Transformation → DataStream
  • 工厂模式: SimpleOperatorFactory 封装算子创建逻辑
  • 建造者模式: 逐步构建复杂的 Transformation 对象

关键技术点

  • 类型安全: 通过 TypeInformation 解决泛型擦除
  • 链式调用: 每个操作返回新的 DataStream
  • 延迟执行: 只构建 Transformation 树,不立即执行
  • 统一抽象: map 和 flatMap 使用相同的处理框架

下节预告

Flink Stream API 源码走读 - keyby


重要提醒:

  • 忽略混淆的命名,关注核心概念
  • SingleOutputStreamOperator 本质就是 DataStream
  • 重点理解 Function → Operator → Transformation → DataStream 的转换流程
http://www.xdnf.cn/news/17769.html

相关文章:

  • KNN(k近邻算法)
  • Chrome插件开发实战:从架构到发布全流程
  • 准备用Qt6 重写音视频会议系统服务端
  • 开源 Arkts 鸿蒙应用 开发(十五)自定义绘图控件--仪表盘
  • 开源 Arkts 鸿蒙应用 开发(十六)自定义绘图控件--波形图
  • 【Javaweb学习|黑马笔记|Day1】初识,入门网页,HTML-CSS|常见的标签和样式|标题排版和样式、正文排版和样式
  • 前端css学习笔记6:盒子模型
  • 国内著名AI搜索优化专家孟庆涛发表《AI搜索内容可信度评估综合指南》
  • liteflow
  • Vue3中的ref与reactive全面解析:如何正确选择响应式声明方式
  • Java List 集合详解(ArrayList、LinkedList、Vector)
  • 水印消失术!JavaAI深度学习去水印技术深度剖析
  • 传输层协议TCP(3)
  • Flink Stream API 源码走读 - socketTextStream
  • 集成电路学习:什么是Machine Learning机器学习
  • 从单机到分布式:用飞算JavaAI构建可扩展的TCP多人聊天系统
  • 【力扣56】合并区间
  • easyexcel模板导出Map数据时空值列被下一行列非空数据覆盖
  • 从零开始的云计算生活——第四十三天,激流勇进,kubernetes模块之Pod资源对象
  • 使用Docker和Miniconda3搭建YOLOv13开发环境
  • 深入解析 Spring IOC 容器在 Web 环境中的启动机制
  • 小知识:for of,for in与forEach
  • Spark Shuffle机制原理
  • 图论(5)最小生成树算法
  • 计算机视觉Open-CV
  • OpenCV图像处理2:边界填充与平滑滤波实战
  • 23.Linux : ftp服务及配置详解
  • C语言指针使用
  • Python网络爬虫(二) - 解析静态网页
  • 【ai写代码】lua-判断表是否被修改