Flink Stream API - 源码二开详细实现
目录
- 系统概述
- 系统架构
- 核心组件
- 时序图
- 架构图
- 数据流转图
- 关键代码示例
- 完整测试场景
系统概述
这是一个基于 Apache Flink 的动态表达式处理系统,支持实时数据流处理和表达式热更新。系统采用操作符-协调器架构模式,实现了分布式环境下的表达式管理和数据处理。该系统现已完成完整的双操作符实现,包含表达式管理和计算执行的完整流程。
核心特性
- 动态表达式更新: 支持运行时更新计算表达式,无需重启作业
- 分布式协调: 使用 Flink 的 OperatorCoordinator 机制实现多实例协调
- 双操作符架构: ExpressionOperator 负责管理,CalculatorOperator 负责计算
- Aviator 表达式引擎: 支持复杂数学表达式、条件判断和函数调用
- 数据分区: 基于内容哈希的智能数据分区策略
- 事件驱动: 支持数据事件和控制事件的混合处理
- 容错机制: 集成 Flink 的检查点和故障恢复机制
- 完整状态同步: 实现分布式环境下的表达式变更同步
技术栈
- Apache Flink 1.18.x: 流处理引擎
- Java 8+: 开发语言
- Aviator 5.x: 动态表达式计算引擎,支持数学函数、条件判断
- Socket: 数据输入源,支持实时数据流
- Maven: 项目构建工具
- Lombok: 简化代码编写(可选)
系统架构
整体架构设计
系统采用分层架构设计,实现了完整的双操作符流处理管道:
- API层 (
api/
): 定义基础事件接口Event
- 事件模型层 (
model/
):events/
: 流处理事件(DataEvent、ExpressionEvent、OperatorIdEvent、PartitionedEvent)dto/
: 数据传输对象(CalculatorTaskRegister、FlushSuccess、FlushNotifyEvent、PendingExpressionChange)
- 核心操作符层 (
core/
):operator/
: ExpressionOperator(表达式管理)、CalculatorOperator(表达式计算)factory/
: ExpressionOperatorFactory(操作符工厂)
- 协调器层 (
coordination/
):coordinator/
: ExpressionCoordinator(分布式协调器)request/
: 协调请求(ExpressionChangeRequest、ReleaseBlockRequest)response/
: 协调响应(ExpressionCoordinationResponse)
- 示例层 (
example/
): TestMain(完整的双操作符流处理管道示例)
双操作符架构特点
- ExpressionOperator: 作为上游操作符,负责表达式管理、事件路由和分布式协调
- CalculatorOperator: 作为下游操作符,负责实际的表达式计算和结果输出
- 协调器: 管理两个操作符之间的状态同步和表达式变更协调
- 事件驱动: 通过统一的事件模型实现操作符间的通信
项目目录结构
org.apache.flink.streaming.examples.lkk.exer_operator/
├── api/ # 基础接口层
│ └── Event.java # 事件基础接口
├── coordination/ # 协调层
│ ├── coordinator/ # 协调器实现
│ │ ├── ExpressionCoordinator.java
│ │ └── ExpressionCoordinatorProvider.java
│ ├── request/ # 协调请求
│ │ ├── ExpressionChangeRequest.java
│ │ └── ReleaseBlockRequest.java
│ └── response/ # 协调响应
│ └── ExpressionCoordinationResponse.java
├── core/ # 核心操作符层
│ ├── factory/ # 操作符工厂
│ │ └── ExpressionOperatorFactory.java
│ └── operator/ # 操作符实现
│ ├── CalculatorOperator.java # 新增:表达式计算操作符
│ └── ExpressionOperator.java # 表达式管理操作符
├── model/ # 数据模型层
│ ├── dto/ # 数据传输对象
│ │ ├── CalculatorTaskRegister.java # 新增:计算任务注册
│ │ ├── FlushNotifyEvent.java # 刷新通知事件
│ │ ├── FlushSuccess.java # 新增:刷新成功事件
│ │ └── PendingExpressionChange.java # 新增:待处理表达式变更
│ └── events/ # 事件模型
│ ├── DataEvent.java
│ ├── ExpressionEvent.java
│ ├── OperatorIdEvent.java # 新增:操作符ID事件
│ └── PartitionedEvent.java
├── example/ # 示例和测试
│ └── TestMain.java # 更新:包含完整的双操作符流程
设计模式
1. 操作符-协调器模式
- 操作符: 处理数据流,执行本地计算
- 协调器: 管理全局状态,协调多个操作符实例
- 通信: 通过 RPC 机制实现异步通信
2. 事件驱动架构
- 统一事件接口: 所有事件实现 HitaEvent 接口
- 多态处理: 根据事件类型执行不同处理逻辑
- 类型安全: 编译时类型检查,运行时类型判断
3. 分区策略
- 哈希分区: 使用
Math.abs(event.hashCode()) % parallelism
- 一致性保证: 相同内容的事件分配到同一分区
- 负载均衡: 均匀分布数据到各个并行实例
核心组件
1. 事件模型层 (Event Model)
Event (基础接口)
public interface Event extends Serializable
- 所有事件的基础接口,确保序列化能力
- 位置:
api/Event.java
- 替代了之前的
HitaoEvent
接口,命名更加简洁明确
DataEvent (数据事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataEvent implements Event {private int f1, f2, f3; // 三个数值字段用于表达式计算
}
- 包含三个数值字段 (f1, f2, f3) 的业务数据事件
- 支持 Aviator 表达式中的变量引用:f1、f2、f3
- 位置:
model/events/DataEvent.java
ExpressionEvent (表达式事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ExpressionEvent implements Event {private String expression; // Aviator 表达式字符串
}
- 包含计算表达式的控制事件,用于动态更新计算规则
- 支持复杂的 Aviator 表达式:算术运算、条件判断、数学函数等
- 位置:
model/events/ExpressionEvent.java
OperatorIdEvent (操作符ID事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OperatorIdEvent implements Event {private OperatorID operatorID; // 操作符唯一标识
}
- 用于传递操作符ID,建立操作符与协调器的通信关系
- ExpressionOperator 启动时广播此事件给 CalculatorOperator
- 位置:
model/events/OperatorIdEvent.java
PartitionedEvent (分区事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PartitionedEvent implements Event {private int partitionId; // 分区IDprivate Event event; // 原始事件
}
- 封装分区ID和原始事件的包装类
- 支持自定义分区策略,确保数据均匀分布
- 位置:
model/events/PartitionedEvent.java
2. 数据传输对象 (DTO)
CalculatorTaskRegister (计算任务注册)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CalculatorTaskRegister implements OperatorEvent {private int subTaskId; // 子任务ID
}
- 计算任务向协调器注册的事件
- 位置:
model/dto/CalculatorTaskRegister.java
FlushSuccess (刷新成功)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlushSuccess implements OperatorEvent {private int subtaskId; // 完成刷新的子任务ID
}
- 通知协调器刷新操作已成功完成
- 位置:
model/dto/FlushSuccess.java
FlushNotifyEvent (刷新通知)
public class FlushNotifyEvent implements HitaoEvent {// 标记事件,用于通知下游任务进行状态刷新
}
- 广播给所有下游任务的刷新通知事件
- 位置:
model/dto/FlushNotifyEvent.java
PendingExpressionChange (待处理表达式变更)
public class PendingExpressionChange {ConcurrentHashMap<Integer, Object> registeredCalculatorSubtasks;CompletableFuture<CoordinationResponse> waitReleaseFuture;int flushSuccessCount;
}
- 管理表达式变更过程中的异步协调
- 位置:
model/dto/PendingExpressionChange.java
3. 核心操作符层
ExpressionOperator (表达式管理操作符)
public class ExpressionOperator extends AbstractStreamOperator<Event>implements OneInputStreamOperator<Event, Event>
- 主要功能:
- 处理表达式变更事件,与协调器通信
- 计算数据分区,支持普通分区和广播模式
- 管理操作符生命周期,广播 OperatorID
- 协调分布式表达式变更过程
- 关键方法:
open()
: 启动时广播 OperatorIdEvent 建立通信链路processElement()
: 处理不同类型的事件(数据事件、表达式事件)partitionBy()
: 计算数据分区,使用哈希策略broadcast()
: 广播事件到所有下游分区releaseRequest()
: 与协调器通信,等待分布式同步完成
- 位置:
core/operator/ExpressionOperator.java
CalculatorOperator (表达式计算操作符)
public class CalculatorOperator extends AbstractStreamOperator<Double>implements OneInputStreamOperator<Event, Double>
- 主要功能:
- 使用 Aviator 引擎执行动态表达式计算
- 向协调器注册自己并参与状态同步
- 处理不同类型的事件(数据、表达式、控制事件)
- 输出 Double 类型的计算结果
- 关键方法:
processElement()
: 根据事件类型执行不同处理逻辑- 支持 DataEvent 计算、ExpressionEvent 编译、OperatorIdEvent 注册
- 支持 FlushNotifyEvent 处理,参与分布式同步
- Aviator 集成:
- 使用
AviatorEvaluator.compile()
编译表达式 - 支持变量映射:f1、f2、f3 对应 DataEvent 的字段
- 支持数学函数、条件判断等复杂表达式
- 使用
- 位置:
core/operator/CalculatorOperator.java
ExpressionOperatorFactory (操作符工厂)
public class ExpressionOperatorFactory extends SimpleOperatorFactory<Event>implements CoordinatedOperatorFactory<Event>
- 主要功能:
- 创建和配置 ExpressionOperator
- 提供协调器集成能力,连接操作符和协调器
- 管理操作符生命周期
- 关键方法:
getCoordinatorProvider()
: 提供协调器提供者- 集成 Flink 的协调器机制,实现分布式协调
- 位置:
core/factory/ExpressionOperatorFactory.java
4. 协调器层
ExpressionCoordinator (分布式协调器)
public class ExpressionCoordinator implements OperatorCoordinator, CoordinationRequestHandler
- 主要功能:
- 管理计算任务的注册状态
- 协调表达式变更过程中的同步操作
- 处理检查点和故障恢复
- 关键方法:
handleEventFromOperator()
: 处理来自操作符的事件handleCoordinationRequest()
: 处理协调请求checkpointCoordinator()
: 检查点状态持久化
- 位置:
coordination/coordinator/ExpressionCoordinator.java
协调请求和响应
- ExpressionChangeRequest: 表达式变更请求
- ReleaseBlockRequest: 释放阻塞请求
- ExpressionCoordinationResponse: 协调响应
- 位置:
coordination/request/
和coordination/response/
时序图
以下时序图展示了系统中各组件之间的交互流程:
时序图说明
时序图展示了系统中各组件之间的完整交互流程,包含四个主要阶段:
1. 系统启动阶段 (System Initialization)
目标: 建立操作符间的通信链路和协调器注册
详细流程:
- TestMain 启动: 创建 StreamExecutionEnvironment,配置数据源和操作符
- ExpressionOperator 初始化:
- 调用
open()
方法启动操作符 - 获取自身的 OperatorID
- 广播
OperatorIdEvent
到所有下游 CalculatorOperator 实例
- 调用
- CalculatorOperator 注册:
- 接收
OperatorIdEvent
,获得上游 ExpressionOperator 的 ID - 创建
CalculatorTaskRegister
事件,包含自己的子任务 ID - 通过协调器网关向 ExpressionCoordinator 发送注册请求
- 接收
- ExpressionCoordinator 记录:
- 接收并处理
CalculatorTaskRegister
事件 - 将注册的计算任务 ID 存储在
registeredCalculatorSubTasks
映射中 - 为后续的分布式协调做准备
- 接收并处理
关键技术点:
- 使用 Flink 的 OperatorID 机制建立操作符间的身份识别
- 通过事件广播实现一对多的通信模式
- 协调器维护全局的任务注册状态
2. 数据事件处理阶段 (Data Event Processing)
目标: 处理业务数据,执行表达式计算
详细流程:
- 数据输入: 用户通过 Socket 输入格式为
d|f1,f2,f3
的数据 - 事件解析: TestMain 中的 map 函数将文本解析为
DataEvent
对象 - ExpressionOperator 处理:
- 接收
DataEvent
,调用processElement()
方法 - 执行
partitionBy()
方法计算分区 ID:Math.abs(event.hashCode()) % parallelism
- 创建
PartitionedEvent
包装原始事件和分区 ID - 发送到下游的 CalculatorOperator
- 接收
- CalculatorOperator 计算:
- 接收
PartitionedEvent
,提取原始的DataEvent
- 检查是否存在当前编译的表达式 (
currentExpression
) - 如果有表达式:
- 将 DataEvent 的字段映射到变量:
f1
,f2
,f3
- 使用 Aviator 引擎执行表达式:
currentExpression.execute(expressionDataMap)
- 输出 Double 类型的计算结果
- 将 DataEvent 的字段映射到变量:
- 如果无表达式:输出 “当前没有表达式,跳过计算”
- 接收
关键技术点:
- 基于哈希的分区策略确保数据均匀分布
- Aviator 表达式引擎的变量映射和执行机制
- 条件处理逻辑,支持有/无表达式的不同场景
3. 表达式变更流程 (Expression Change Flow)
目标: 实现分布式环境下的表达式热更新
详细流程:
阶段 3.1 - 协调请求:
- 表达式输入: 用户输入格式为
e|expression
的表达式事件 - ExpressionOperator 处理:
- 接收
ExpressionEvent
,提取表达式字符串 - 创建
ExpressionChangeRequest
协调请求 - 通过
coordinator.sendRequestToCoordinator()
发送请求
- 接收
- ExpressionCoordinator 响应:
- 接收并处理
ExpressionChangeRequest
- 创建
PendingExpressionChange
对象管理异步协调状态 - 初始化
CompletableFuture<CoordinationResponse>
用于后续阻塞释放 - 返回
ExpressionCoordinationResponse
确认请求接收
- 接收并处理
阶段 3.2 - 状态同步:
- ExpressionOperator 广播刷新:
- 向所有下游 CalculatorOperator 广播
FlushNotifyEvent
- 通知所有计算任务准备状态同步
- 向所有下游 CalculatorOperator 广播
- CalculatorOperator 响应:
- 接收
FlushNotifyEvent
,执行本地状态刷新操作 - 创建
FlushSuccess
事件,包含自己的子任务 ID - 向协调器发送刷新完成通知
- 接收
- ExpressionCoordinator 计数:
- 接收每个
FlushSuccess
事件 - 更新
flushSuccessCount
计数器 - 检查是否所有注册的计算任务都已完成刷新
- 接收每个
阶段 3.3 - 阻塞释放:
- ExpressionOperator 等待:
- 发送
ReleaseBlockRequest
到协调器 - 等待协调器确认所有任务同步完成
- 发送
- ExpressionCoordinator 检查:
- 检查
flushSuccessCount
是否等于注册任务数量 - 如果所有任务完成,调用
waitReleaseFuture.complete()
释放阻塞 - 返回响应给 ExpressionOperator
- 检查
阶段 3.4 - 表达式广播:
- ExpressionOperator 广播新表达式:
- 阻塞释放后,向所有下游广播原始的
ExpressionEvent
- 阻塞释放后,向所有下游广播原始的
- CalculatorOperator 编译:
- 接收新的
ExpressionEvent
- 使用
AviatorEvaluator.compile()
编译表达式 - 更新本地的
currentExpression
变量 - 准备处理后续的数据事件
- 接收新的
关键技术点:
- 使用
CompletableFuture
实现异步协调和阻塞控制 - 分布式计数机制确保所有任务同步完成
- 原子性操作保证表达式变更的一致性
4. 后续数据处理阶段 (Subsequent Data Processing)
目标: 使用新表达式处理数据,验证热更新效果
详细流程:
- 数据输入: 用户继续输入数据事件
- 表达式计算: CalculatorOperator 使用新编译的表达式进行计算
- 结果输出: 输出使用新表达式计算的结果,验证热更新成功
支持的表达式类型:
- 基本算术:
f1+f2+f3
,f1*f2+f3
- 条件判断:
f1>f2?f1+f2:f2+f3
- 数学函数:
math.sqrt(f1*f1+f2*f2)+f3
- 逻辑运算:
f1>10 && f2<20
关键技术点:
- Aviator 引擎的强大表达式支持能力
- 热更新的即时生效,无需重启系统
- 完整的端到端数据处理验证
架构图
以下架构图展示了系统的整体结构和组件关系:
架构图说明
架构图展示了系统的五个核心层次:
- 数据输入层 (黄色): 负责接收用户输入和数据解析
- 事件模型层 (蓝色): 定义统一的事件接口体系,包含完整的事件类型
- 操作符层 (紫色): 双操作符架构,ExpressionOperator 管理,CalculatorOperator 计算
- 协调器层 (绿色): 分布式协调机制,管理表达式变更和状态同步
- 数据处理层 (粉色): 分区策略、Aviator 引擎和结果输出
各层之间通过明确的接口进行交互,实现了良好的分层解耦。双操作符架构确保了职责分离和系统的可扩展性。
关键代码示例
1. ExpressionOperator 核心处理逻辑
操作符初始化
@Override
public void open() throws Exception {super.open();// 向下游广播操作符ID,建立通信关系OperatorID expressionOperatorID = getOperatorID();broadcast(new OperatorIdEvent(expressionOperatorID));System.out.println("ExpressionOperator 启动完成,OperatorID: " + expressionOperatorID.toHexString());
}
事件处理逻辑
@Override
public void processElement(StreamRecord<HitaoEvent> element) throws Exception {HitaoEvent hitaoEvent = element.getValue();// 处理表达式变更事件if (hitaoEvent instanceof ExpressionEvent) {ExpressionEvent exp = (ExpressionEvent) hitaoEvent;String expression = exp.getExpression();System.out.println("收到表达式变更事件: " + expression);// 1. 向协调器请求表达式变更ExpressionChangeRequest expressionChangeRequest = new ExpressionChangeRequest(expression);coordinator.sendRequestToCoordinator(operatorID, new SerializedValue<>(expressionChangeRequest));// 2. 向下游广播刷新通知broadcast(new FlushNotifyEvent());// 3. 等待协调器释放阻塞releaseRequest();System.out.println("阻塞被放开,继续处理");// 4. 向下游广播新的表达式broadcast(exp);}// 所有事件都进行分区处理partitionBy(hitaoEvent);
}
数据分区策略
private void partitionBy(HitaoEvent hitaoEvent) {// 计算分区ID:使用 hashCode 取模,并确保结果为正数int partitionId = Math.abs(hitaoEvent.hashCode()) % downStreamParallelism;PartitionedEvent partitionedEvent = new PartitionedEvent(partitionId, hitaoEvent);output.collect(new StreamRecord<>(partitionedEvent));
}private void broadcast(HitaoEvent hitaoEvent) {// 广播到所有下游分区for (int i = 0; i < downStreamParallelism; i++) {PartitionedEvent partitionedEvent = new PartitionedEvent(i, hitaoEvent);output.collect(new StreamRecord<>(partitionedEvent));}
}
2. CalculatorOperator 计算逻辑
事件处理和表达式计算
@Override
public void processElement(StreamRecord<HitaoEvent> element) throws Exception {PartitionedEvent partitionedEvent = (PartitionedEvent) element.getValue();HitaoEvent event = partitionedEvent.getEvent();if (event instanceof DataEvent) {DataEvent dataEvent = (DataEvent) event;// 使用 Aviator 引擎执行表达式计算if (currentExpression != null) {expressionDataMap.put("f1", (double) dataEvent.getF1());expressionDataMap.put("f2", (double) dataEvent.getF2());expressionDataMap.put("f3", (double) dataEvent.getF3());double result = (double) currentExpression.execute(expressionDataMap);output.collect(new StreamRecord<>(result));System.out.println("计算结果: f1=" + dataEvent.getF1() +", f2=" + dataEvent.getF2() +", f3=" + dataEvent.getF3() +" => " + result);}} else if (event instanceof OperatorIdEvent) {// 接收操作符ID并向协调器注册OperatorIdEvent operatorIdEvent = (OperatorIdEvent) event;this.expressionOperatorID = operatorIdEvent.getOperatorID();CalculatorTaskRegister register = new CalculatorTaskRegister(selfSubTaskId);coordinator.sendOperatorEventToCoordinator(expressionOperatorID, new SerializedValue<>(register));} else if (event instanceof ExpressionEvent) {// 编译新的表达式ExpressionEvent expressionEvent = (ExpressionEvent) event;this.currentExpression = AviatorEvaluator.compile(expressionEvent.getExpression());System.out.println("表达式编译完成: " + expressionEvent.getExpression());} else if (event instanceof FlushNotifyEvent) {// 发送刷新完成通知coordinator.sendOperatorEventToCoordinator(expressionOperatorID,new SerializedValue<>(new FlushSuccess(selfSubTaskId)));}
}
3. ExpressionCoordinator 协调逻辑
协调请求处理
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {if (request instanceof ExpressionChangeRequest) {CompletableFuture<CoordinationResponse> waitReleaseFuture = new CompletableFuture<>();System.out.println("收到 expression change request");// 创建待处理的表达式变更pendingExpressionChange = new PendingExpressionChange(registeredCalculatorSubTasks, waitReleaseFuture);return CompletableFuture.completedFuture(new ExpressionCoordinationResponse());} else if (request instanceof ReleaseBlockRequest) {System.out.println("收到 release block request");// 返回等待所有任务完成的 Futurereturn pendingExpressionChange.waitReleaseFuture;}throw new IllegalArgumentException("请求类型不支持: " + request.getClass());
}
操作符事件处理
@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {if (event instanceof FlushSuccess) {FlushSuccess success = (FlushSuccess) event;int subtaskId = success.getSubtaskId();System.out.println("收到一个 flush success 事件,子任务ID: " + subtaskId);if (pendingExpressionChange != null) {pendingExpressionChange.flushSuccess(subtaskId);}} else if (event instanceof CalculatorTaskRegister) {CalculatorTaskRegister register = (CalculatorTaskRegister) event;int subTaskId = register.getSubTaskId();System.out.println("收到一个calculator sub task注册事件: " + subTaskId);registeredCalculatorSubTasks.put(subTaskId, "");}
}
4. 完整的流处理管道
TestMain 中的管道构建
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 创建数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 6666);// 2. 解析文本为事件对象SingleOutputStreamOperator<Event> events = stream.map(s -> {String[] split = s.split("\\|");if (split.length < 2) {throw new IllegalArgumentException("Invalid input format: " + s +". Expected format: 'd|10,20,30' or 'e|f1-f2*f3'");}if ("d".equals(split[0])) {// 解析数据事件: d|10,20,30String[] fields = split[1].split(",");if (fields.length < 3) {throw new IllegalArgumentException("Invalid data format: " + s +". Expected format: 'd|field1,field2,field3'");}return new DataEvent(Integer.parseInt(fields[0]),Integer.parseInt(fields[1]),Integer.parseInt(fields[2]));} else {// 解析表达式事件: e|f1+f2*f3return new ExpressionEvent(split[1]);}});// 3. 设置下游计算任务的并行度int calculatorTaskParallelism = 2;// 4. 添加表达式管理操作符SingleOutputStreamOperator<Event> partitionedEvents = events.transform("expression management operator",TypeInformation.of(Event.class),new ExpressionOperatorFactory(new ExpressionOperator(calculatorTaskParallelism)));// 5. 配置自定义分区策略DataStream<Event> partitionedStream = partitionedEvents.partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer key, int numPartitions) {return key % numPartitions;}},new KeySelector<Event, Integer>() {@Overridepublic Integer getKey(Event value) throws Exception {PartitionedEvent event = (PartitionedEvent) value;return event.getPartitionId();}});// 6. 添加表达式计算操作符SingleOutputStreamOperator<Double> calculatorOperator = partitionedStream.transform("calculator operator",TypeInformation.of(Double.class),new CalculatorOperator()).setParallelism(calculatorTaskParallelism);// 7. 输出计算结果calculatorOperator.print().setParallelism(calculatorTaskParallelism);// 8. 启动 Flink 作业执行env.execute("Flink Expression Processing System");
}
完整测试场景
完整的端到端测试流程
测试环境准备
# 1. 确保 Java 环境# 2. 编译项目
cd flink-release-1.18
mvn clean compile -pl flink-examples/flink-examples-streaming# 3. 启动 Flink 作业(在一个终端)
idea 中启动# 4. 启动数据输入(在另一个终端)
nc -l 6666
完整测试序列
步骤1: 验证系统启动
# 预期日志输出
ExpressionOperator 启动完成,OperatorID: xxx
CalculatorOperator 启动完成,子任务ID: 0
CalculatorOperator 启动完成,子任务ID: 1
收到 expression 管理算子的 算子ID: xxx
向协调器注册完成,子任务ID: 0
向协调器注册完成,子任务ID: 1
步骤2: 测试无表达式的数据处理
# 输入数据
d|10,20,30# 预期输出
当前没有表达式,跳过计算
# 系统正常处理但不输出计算结果
步骤3: 设置表达式并测试计算
# 输入表达式
e|f1+f2+f3# 预期日志
收到表达式变更事件: f1+f2+f3
收到 expression change request
发送 FlushSuccess 事件,子任务ID: 0
发送 FlushSuccess 事件,子任务ID: 1
所有 subtask flush 都完成
阻塞被放开,继续处理
收到新的表达式: f1+f2+f3
表达式编译完成: f1+f2+f3# 输入数据
d|10,20,30# 预期输出
计算结果: f1=10, f2=20, f3=30 => 60.0
60.0
步骤4: 动态表达式更新测试
# 更新表达式
e|f1*f2+f3# 输入数据
d|10,20,30# 预期输出
计算结果: f1=10, f2=20, f3=30 => 230.0
230.0
步骤5: 复杂表达式测试
# 条件表达式
e|f1>f2?f1+f2:f2+f3# 测试数据1
d|30,20,10
# 输出: 50.0 (30>20为真,所以30+20)# 测试数据2
d|10,20,30
# 输出: 50.0 (10>20为假,所以20+30)# 数学函数表达式
e|math.sqrt(f1*f1+f2*f2)+f3# 测试数据
d|3,4,5
# 输出: 10.0 (sqrt(9+16)+5 = 5+5)
步骤6: 并行处理验证
# 快速输入多个数据,验证并行处理
d|1,2,3
d|4,5,6
d|7,8,9# 预期输出(可能乱序,因为并行处理)
计算结果: f1=1, f2=2, f3=3 => 6.0
计算结果: f1=4, f2=5, f3=6 => 15.0
计算结果: f1=7, f2=8, f3=9 => 24.0
6.0
15.0
24.0
适用场景
虽然是个demo,但是可以帮助我们更深入的理解flink的源码,为构建灵活、可扩展、高性能的实时数据处理系统提供了完整的技术参考和实现方案。可以扩展应用到下面场景
- 实时规则引擎: 需要动态更新处理规则的场景,如业务规则变更
- 流式计算平台: 需要灵活配置计算逻辑的数据处理系统
- 监控告警系统: 需要实时调整告警规则的监控平台
- 推荐系统: 需要动态调整推荐算法的实时推荐场景
- 金融风控: 需要实时调整风控规则的金融系统
- IoT数据处理: 需要动态调整传感器数据处理逻辑的物联网系统
- 实时数据分析: 需要动态调整分析指标和计算公式的场景
- A/B测试平台: 需要实时切换不同算法版本的实验平台
返回目录
Flink 源码系列 - 前言