Flink Stream API 源码走读 - map 和 flatMap
概述
本文深入分析了 Flink 中 map()
和 flatMap()
方法的源码实现,展示了从 Function 到 Operator 再到 Transformation 的完整转换流程。
前置知识回顾
DataStream 的核心结构
public class DataStream<T> {protected final StreamExecutionEnvironment environment; // 执行环境protected final Transformation<T> transformation; // 转换操作
}
重要理解:
- 每个
DataStream
都包含一个Transformation
Transformation
持有上一个Transformation
的引用,形成链条- 执行环境
environment
在整个调用链中保持不变
map() 方法源码分析
1. map 方法的重载链
// 用户调用入口
dataStream.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) throws Exception {return value.length();}
})
2. 类型信息抽取
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {// 1. 抽取输出类型信息TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true);// 2. 调用重载方法return map(mapper, outType);
}
关键点:
clean(mapper)
- 对用户函数进行序列化检查TypeExtractor.getMapReturnTypes()
- 抽取 MapFunction 的返回类型- 解决 Java 泛型擦除问题
3. Function → Operator 转换
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将 MapFunction 封装成 StreamMap Operatorreturn transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
转换过程:
MapFunction
(用户逻辑) →StreamMap
(算子)StreamMap
继承自AbstractUdfStreamOperator
- 这是第二个核心概念:Operator(算子)
4. transform 方法分析
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {// 将 Operator 包装成 OperatorFactoryreturn doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
重要概念:
SimpleOperatorFactory
- 简单算子工厂- 算子工厂是对算子的进一步包装
- 提供
getOperator()
和createStreamOperator()
方法
5. doTransform 核心逻辑
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {// 1. 创建 OneInputTransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation, // 上一个 transformationoperatorName, // 算子名称 "Map"operatorFactory, // 算子工厂outTypeInfo, // 输出类型environment.getParallelism(), // 并行度false // 是否并行配置);// 2. 创建新的 DataStreamSingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);// 3. 将 transformation 添加到执行环境getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}
核心概念转换流程
转换详解
-
Function → Operator
MapFunction<String, Integer> mapper = ...; StreamMap<String, Integer> operator = new StreamMap<>(mapper);
-
Operator → OperatorFactory
SimpleOperatorFactory<Integer> factory = SimpleOperatorFactory.of(operator);
-
OperatorFactory → Transformation
OneInputTransformation<String, Integer> transformation = new OneInputTransformation<>(this.transformation, "Map", factory, outType, parallelism, false);
-
Transformation → DataStream
SingleOutputStreamOperator<Integer> result = new SingleOutputStreamOperator<>(environment, transformation);
OneInputTransformation 详解
为什么叫 OneInput?
public class OneInputTransformation<IN, OUT> extends Transformation<OUT> {private final Transformation<IN> input; // 只有一个输入// ...
}
命名含义:
OneInput
- 表示只有一个输入流- 对应的还有
TwoInputTransformation
(如 join、union 操作) - 体现了不同算子的输入特性
Transformation 链条
flatMap() 方法源码分析
1. flatMap 与 map 的相似性
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {// 1. 抽取输出类型TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true);// 2. 调用重载方法return flatMap(flatMapper, outType);
}public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {// 将 FlatMapFunction 封装成 StreamFlatMap Operatorreturn transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
2. 与 map 的差异
特性 | map | flatMap |
---|---|---|
函数接口 | MapFunction<T, R> | FlatMapFunction<T, R> |
算子实现 | StreamMap | StreamFlatMap |
输出特性 | 1对1映射 | 1对多映射 |
算子名称 | “Map” | “Flat Map” |
3. 相同的处理流程
流程一致性:
- 都经过相同的
doTransform
方法 - 都创建
OneInputTransformation
- 都返回
SingleOutputStreamOperator
执行环境的 Transformation 管理
addOperator 方法
// 在 doTransform 中调用
getExecutionEnvironment().addOperator(resultTransform);
// StreamExecutionEnvironment 中的实现
public class StreamExecutionEnvironment {private final List<Transformation<?>> transformations = new ArrayList<>();public void addOperator(Transformation<?> transformation) {transformations.add(transformation);}
}
重要作用:
- 将每个
Transformation
添加到环境的列表中 - 为后续生成 JobGraph 做准备
- 形成完整的 Transformation 树
命名问题的吐槽
容易混淆的命名
-
SingleOutputStreamOperator
// 实际上是个 DataStream,不是 Operator! public class SingleOutputStreamOperator<T> extends DataStream<T>
-
addOperator vs addTransformation
// 方法名叫 addOperator,实际添加的是 Transformation environment.addOperator(transformation); // 应该叫 addTransformation
建议:
- 忽略这些命名问题,理解本质
SingleOutputStreamOperator
就是DataStream
- 重点关注概念转换流程
链式调用的实现
返回值分析
DataStream<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Integer> mapped = source.map(...); // 返回 DataStream
SingleOutputStreamOperator<String> flatMapped = mapped.flatMap(...); // 返回 DataStream
KeyedStream<String, String> keyed = flatMapped.keyBy(...); // 返回 KeyedStream
链式调用原理:
- 每个操作都返回某种形式的
DataStream
SingleOutputStreamOperator
继承自DataStream
- 所有 API 方法都定义在
DataStream
中
为什么叫 SingleOutput?
public class SingleOutputStreamOperator<T> extends DataStream<T>
命名含义:
SingleOutput
- 表示只有一个输出流- 区别于可能有多个输出的算子
- 体现了算子的输出特性
总结
核心流程回顾
- 用户调用
dataStream.map(mapFunction)
- 类型抽取 通过
TypeExtractor
获取返回类型 - Function→Operator 将
MapFunction
封装成StreamMap
- Operator→Factory 将算子包装成
SimpleOperatorFactory
- Factory→Transformation 创建
OneInputTransformation
- Transformation→DataStream 创建新的
SingleOutputStreamOperator
- 环境管理 将
Transformation
添加到执行环境
设计模式体现
- 装饰器模式: Function → Operator → Factory → Transformation → DataStream
- 工厂模式:
SimpleOperatorFactory
封装算子创建逻辑 - 建造者模式: 逐步构建复杂的 Transformation 对象
关键技术点
- 类型安全: 通过
TypeInformation
解决泛型擦除 - 链式调用: 每个操作返回新的
DataStream
- 延迟执行: 只构建 Transformation 树,不立即执行
- 统一抽象: map 和 flatMap 使用相同的处理框架
下节预告
Flink Stream API 源码走读 - keyby
重要提醒:
- 忽略混淆的命名,关注核心概念
SingleOutputStreamOperator
本质就是DataStream
- 重点理解 Function → Operator → Transformation → DataStream 的转换流程