从源码中学习Java面向对象的多态
前言:为什么要从源码学习多态?
很多开发者对多态的理解停留在教科书层面:父类引用指向子类对象,运行时动态绑定。但在真实的企业级框架中,多态是如何被巧妙运用的?它解决了什么实际问题?
本文将通过分析Apache Flink源码中的多态机制,带您深入理解多态在大型框架中的实战运用,从调用链路、设计模式到架构思维,全方位掌握多态的精髓。
📋 源码验证说明:
本文中的所有关键代码示例都来自Apache Flink 1.18的真实源码,包括:
AbstractUdfStreamOperator.java
:用户函数算子的基类实现StreamingFunctionUtils.java
:多态调用的工具类(第188-191行,第117-121行)- 文中的多态调用机制
instanceof CheckpointedFunction
确实存在于源码中读者可以在Flink源码中找到对应的实现,验证文章内容的真实性。
1. 多态的真实面貌:一个完整的调用链路
让我们从一个真实的场景开始:当Flink需要恢复算子状态时,框架代码是如何"找到"并调用用户自定义的状态恢复方法的?
1.1 问题场景
用户实现了一个自定义的Source函数:
public class MySourceFunction implements SourceFunction<String>, CheckpointedFunction {private List<String> buffer = new ArrayList<>();private ListState<String> historyState;@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 🎯 这个方法是如何被框架调用的?ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("history", String.class);historyState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (String item : historyState.get()) {buffer.add(item);}}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {historyState.clear();for (String item : buffer) {historyState.add(item);}}
}
表面上看,框架代码和用户代码是完全分离的,那么Flink是如何在适当的时机调用用户的initializeState
方法的呢?
1.2 多态调用链路解析
让我们追踪完整的调用链路,看多态机制是如何工作的:
第一层:接口定义契约
// 在StreamOperatorStateHandler中
public interface CheckpointedStreamOperator {void initializeState(StateInitializationContext context) throws Exception;void snapshotState(StateSnapshotContext context) throws Exception;
}
这个接口定义了状态管理的契约,所有需要状态管理的算子都必须实现这个接口。
第二层:抽象类提供框架实现
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, CheckpointedStreamOperator {@Overridepublic void initializeState(StateInitializationContext context) throws Exception {// 🎯 默认空实现,子类可以重写}
}
第三层:具体算子类实现业务逻辑
// AbstractUdfStreamOperator.java - Flink源码中的真实实现
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT> {protected final F userFunction; // 包装用户函数@Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);// 🎯 委托给工具类处理用户函数的状态恢复StreamingFunctionUtils.restoreFunctionState(context, userFunction);}
}
真正的多态调用在工具类中实现:
// StreamingFunctionUtils.java - 真实源码第188-191行
private static boolean tryRestoreFunction(StateInitializationContext context, Function userFunction) throws Exception {// 🎯 这里是真实的多态调用!if (userFunction instanceof CheckpointedFunction) {((CheckpointedFunction) userFunction).initializeState(context);return true;}// 支持旧版本的ListCheckpointed接口if (context.isRestored() && userFunction instanceof ListCheckpointed) {// ... 处理旧版本接口}return false;
}
第四层:用户实现具体逻辑
// 用户的MySourceFunction最终被调用
public class MySourceFunction implements SourceFunction<String>, CheckpointedFunction {@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 🎯 用户的状态恢复逻辑}
}
1.3 多态机制的关键作用点
在这个调用链路中,多态机制在以下几个关键点发挥作用:
作用点1:接口多态
// 在StreamOperatorStateHandler中
public void initializeOperatorState(CheckpointedStreamOperator streamOperator) {// streamOperator的实际类型可能是SourceOperator、MapOperator等streamOperator.initializeState(context); // 🎯 多态分发
}
作用点2:继承多态
// 实际传入的可能是各种不同的AbstractStreamOperator子类
AbstractStreamOperator operator = getOperator();
operator.initializeState(context); // 🎯 调用子类重写的方法
作用点3:实现多态
// 用户函数可能实现不同的接口
if (userFunction instanceof CheckpointedFunction) {((CheckpointedFunction) userFunction).initializeState(context); // 🎯 接口实现多态
}
2. 多态解决的核心问题:解耦与扩展
2.1 框架与用户代码的解耦
问题:框架代码不应该依赖具体的用户实现,但又需要在适当时机调用用户逻辑。
多态解决方案:
// 框架代码只依赖接口,不依赖具体实现
public class StateManager {public void restoreState(CheckpointedStreamOperator operator) {// 不管operator的具体类型,都可以调用operator.initializeState(context); // 多态调用}
}
2.2 算子类型的统一处理
问题:Flink有数百种不同类型的算子(Source、Map、Filter、Sink等),如何统一管理?
多态解决方案:
public class OperatorChain {private List<StreamOperator<?>> operators; // 统一的接口类型public void initializeAllOperators() {for (StreamOperator<?> operator : operators) {// 🎯 不管具体类型,统一调用operator.initializeState(stateInitializer);}}
}
2.3 插件化扩展能力
问题:如何让用户能够自由扩展算子功能,而不修改框架代码?
多态解决方案:
// 用户可以实现各种不同的算子
public class MyCustomOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {@Overridepublic void processElement(StreamRecord<String> element) throws Exception {// 🎯 用户自定义逻辑,框架无需关心具体实现output.collect(element);}
}
3. 多态设计模式在源码中的应用
3.1 策略模式 + 多态
Flink中的StateBackend选择就是典型的策略模式+多态应用:
// 状态后端的抽象接口
public abstract class StateBackend implements Serializable {public abstract CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,CloseableRegistry cancelStreamRegistry) throws Exception;
}// 具体策略实现
public class HashMapStateBackend extends StateBackend {@Overridepublic CheckpointableKeyedStateBackend<K> createKeyedStateBackend(...) {return new HeapKeyedStateBackend<>(...); // 🎯 内存实现}
}public class RocksDBStateBackend extends StateBackend {@Overridepublic CheckpointableKeyedStateBackend<K> createKeyedStateBackend(...) {return new RocksDBKeyedStateBackend<>(...); // 🎯 RocksDB实现}
}// 使用时的多态调用
StateBackend stateBackend = getConfiguredStateBackend(); // 运行时确定具体类型
KeyedStateBackend backend = stateBackend.createKeyedStateBackend(...); // 🎯 多态调用
3.2 模板方法模式 + 多态
AbstractUdfStreamOperator使用了模板方法模式,真实源码如下:
// AbstractUdfStreamOperator.java - 快照状态的真实实现
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);// 🎯 委托给工具类处理用户函数的状态快照StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}
真正的多态调用在工具类中:
// StreamingFunctionUtils.java - 真实源码第117-121行
private static boolean trySnapshotFunctionState(StateSnapshotContext context, OperatorStateBackend backend, Function userFunction)throws Exception {// 🎯 这里是真实的多态调用!if (userFunction instanceof CheckpointedFunction) {((CheckpointedFunction) userFunction).snapshotState(context);return true;}// 支持旧版本的ListCheckpointed接口if (userFunction instanceof ListCheckpointed) {// ... 处理旧版本快照逻辑}return false;
}
3.3 代理模式 + 多态
算子对用户函数的包装使用了代理模式:
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {// 🎯 代理调用用户函数OUT result = userFunction.map(element.getValue()); // 多态调用用户实现output.collect(element.replace(result));}
}
参考资料:
- Apache Flink 1.18 源码
- Java Language Specification
- Effective Java (Joshua Bloch)
- Design Patterns: Elements of Reusable Object-Oriented Software