当前位置: 首页 > news >正文

Flink Stream API - 源码二开详细实现

目录

  • 系统概述
  • 系统架构
  • 核心组件
  • 时序图
  • 架构图
  • 数据流转图
  • 关键代码示例
  • 完整测试场景

系统概述

这是一个基于 Apache Flink 的动态表达式处理系统,支持实时数据流处理和表达式热更新。系统采用操作符-协调器架构模式,实现了分布式环境下的表达式管理和数据处理。该系统现已完成完整的双操作符实现,包含表达式管理和计算执行的完整流程。

核心特性

  • 动态表达式更新: 支持运行时更新计算表达式,无需重启作业
  • 分布式协调: 使用 Flink 的 OperatorCoordinator 机制实现多实例协调
  • 双操作符架构: ExpressionOperator 负责管理,CalculatorOperator 负责计算
  • Aviator 表达式引擎: 支持复杂数学表达式、条件判断和函数调用
  • 数据分区: 基于内容哈希的智能数据分区策略
  • 事件驱动: 支持数据事件和控制事件的混合处理
  • 容错机制: 集成 Flink 的检查点和故障恢复机制
  • 完整状态同步: 实现分布式环境下的表达式变更同步

技术栈

  • Apache Flink 1.18.x: 流处理引擎
  • Java 8+: 开发语言
  • Aviator 5.x: 动态表达式计算引擎,支持数学函数、条件判断
  • Socket: 数据输入源,支持实时数据流
  • Maven: 项目构建工具
  • Lombok: 简化代码编写(可选)

系统架构

整体架构设计

系统采用分层架构设计,实现了完整的双操作符流处理管道:

  1. API层 (api/): 定义基础事件接口 Event
  2. 事件模型层 (model/):
    • events/: 流处理事件(DataEvent、ExpressionEvent、OperatorIdEvent、PartitionedEvent)
    • dto/: 数据传输对象(CalculatorTaskRegister、FlushSuccess、FlushNotifyEvent、PendingExpressionChange)
  3. 核心操作符层 (core/):
    • operator/: ExpressionOperator(表达式管理)、CalculatorOperator(表达式计算)
    • factory/: ExpressionOperatorFactory(操作符工厂)
  4. 协调器层 (coordination/):
    • coordinator/: ExpressionCoordinator(分布式协调器)
    • request/: 协调请求(ExpressionChangeRequest、ReleaseBlockRequest)
    • response/: 协调响应(ExpressionCoordinationResponse)
  5. 示例层 (example/): TestMain(完整的双操作符流处理管道示例)

双操作符架构特点

  • ExpressionOperator: 作为上游操作符,负责表达式管理、事件路由和分布式协调
  • CalculatorOperator: 作为下游操作符,负责实际的表达式计算和结果输出
  • 协调器: 管理两个操作符之间的状态同步和表达式变更协调
  • 事件驱动: 通过统一的事件模型实现操作符间的通信

项目目录结构

org.apache.flink.streaming.examples.lkk.exer_operator/
├── api/                           # 基础接口层
│   └── Event.java                 # 事件基础接口
├── coordination/                  # 协调层
│   ├── coordinator/              # 协调器实现
│   │   ├── ExpressionCoordinator.java
│   │   └── ExpressionCoordinatorProvider.java
│   ├── request/                  # 协调请求
│   │   ├── ExpressionChangeRequest.java
│   │   └── ReleaseBlockRequest.java
│   └── response/                 # 协调响应
│       └── ExpressionCoordinationResponse.java
├── core/                         # 核心操作符层
│   ├── factory/                  # 操作符工厂
│   │   └── ExpressionOperatorFactory.java
│   └── operator/                 # 操作符实现
│       ├── CalculatorOperator.java      # 新增:表达式计算操作符
│       └── ExpressionOperator.java      # 表达式管理操作符
├── model/                        # 数据模型层
│   ├── dto/                      # 数据传输对象
│   │   ├── CalculatorTaskRegister.java  # 新增:计算任务注册
│   │   ├── FlushNotifyEvent.java        # 刷新通知事件
│   │   ├── FlushSuccess.java            # 新增:刷新成功事件
│   │   └── PendingExpressionChange.java # 新增:待处理表达式变更
│   └── events/                   # 事件模型
│       ├── DataEvent.java
│       ├── ExpressionEvent.java
│       ├── OperatorIdEvent.java         # 新增:操作符ID事件
│       └── PartitionedEvent.java
├── example/                      # 示例和测试
│   └── TestMain.java            # 更新:包含完整的双操作符流程

设计模式

1. 操作符-协调器模式
  • 操作符: 处理数据流,执行本地计算
  • 协调器: 管理全局状态,协调多个操作符实例
  • 通信: 通过 RPC 机制实现异步通信
2. 事件驱动架构
  • 统一事件接口: 所有事件实现 HitaEvent 接口
  • 多态处理: 根据事件类型执行不同处理逻辑
  • 类型安全: 编译时类型检查,运行时类型判断
3. 分区策略
  • 哈希分区: 使用 Math.abs(event.hashCode()) % parallelism
  • 一致性保证: 相同内容的事件分配到同一分区
  • 负载均衡: 均匀分布数据到各个并行实例

核心组件

1. 事件模型层 (Event Model)

Event (基础接口)
public interface Event extends Serializable
  • 所有事件的基础接口,确保序列化能力
  • 位置:api/Event.java
  • 替代了之前的 HitaoEvent 接口,命名更加简洁明确
DataEvent (数据事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataEvent implements Event {private int f1, f2, f3;  // 三个数值字段用于表达式计算
}
  • 包含三个数值字段 (f1, f2, f3) 的业务数据事件
  • 支持 Aviator 表达式中的变量引用:f1、f2、f3
  • 位置:model/events/DataEvent.java
ExpressionEvent (表达式事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ExpressionEvent implements Event {private String expression;  // Aviator 表达式字符串
}
  • 包含计算表达式的控制事件,用于动态更新计算规则
  • 支持复杂的 Aviator 表达式:算术运算、条件判断、数学函数等
  • 位置:model/events/ExpressionEvent.java
OperatorIdEvent (操作符ID事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OperatorIdEvent implements Event {private OperatorID operatorID;  // 操作符唯一标识
}
  • 用于传递操作符ID,建立操作符与协调器的通信关系
  • ExpressionOperator 启动时广播此事件给 CalculatorOperator
  • 位置:model/events/OperatorIdEvent.java
PartitionedEvent (分区事件)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PartitionedEvent implements Event {private int partitionId;     // 分区IDprivate Event event;         // 原始事件
}
  • 封装分区ID和原始事件的包装类
  • 支持自定义分区策略,确保数据均匀分布
  • 位置:model/events/PartitionedEvent.java

2. 数据传输对象 (DTO)

CalculatorTaskRegister (计算任务注册)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CalculatorTaskRegister implements OperatorEvent {private int subTaskId;  // 子任务ID
}
  • 计算任务向协调器注册的事件
  • 位置:model/dto/CalculatorTaskRegister.java
FlushSuccess (刷新成功)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlushSuccess implements OperatorEvent {private int subtaskId;  // 完成刷新的子任务ID
}
  • 通知协调器刷新操作已成功完成
  • 位置:model/dto/FlushSuccess.java
FlushNotifyEvent (刷新通知)
public class FlushNotifyEvent implements HitaoEvent {// 标记事件,用于通知下游任务进行状态刷新
}
  • 广播给所有下游任务的刷新通知事件
  • 位置:model/dto/FlushNotifyEvent.java
PendingExpressionChange (待处理表达式变更)
public class PendingExpressionChange {ConcurrentHashMap<Integer, Object> registeredCalculatorSubtasks;CompletableFuture<CoordinationResponse> waitReleaseFuture;int flushSuccessCount;
}
  • 管理表达式变更过程中的异步协调
  • 位置:model/dto/PendingExpressionChange.java

3. 核心操作符层

ExpressionOperator (表达式管理操作符)
public class ExpressionOperator extends AbstractStreamOperator<Event>implements OneInputStreamOperator<Event, Event>
  • 主要功能
    • 处理表达式变更事件,与协调器通信
    • 计算数据分区,支持普通分区和广播模式
    • 管理操作符生命周期,广播 OperatorID
    • 协调分布式表达式变更过程
  • 关键方法
    • open(): 启动时广播 OperatorIdEvent 建立通信链路
    • processElement(): 处理不同类型的事件(数据事件、表达式事件)
    • partitionBy(): 计算数据分区,使用哈希策略
    • broadcast(): 广播事件到所有下游分区
    • releaseRequest(): 与协调器通信,等待分布式同步完成
  • 位置:core/operator/ExpressionOperator.java
CalculatorOperator (表达式计算操作符)
public class CalculatorOperator extends AbstractStreamOperator<Double>implements OneInputStreamOperator<Event, Double>
  • 主要功能
    • 使用 Aviator 引擎执行动态表达式计算
    • 向协调器注册自己并参与状态同步
    • 处理不同类型的事件(数据、表达式、控制事件)
    • 输出 Double 类型的计算结果
  • 关键方法
    • processElement(): 根据事件类型执行不同处理逻辑
    • 支持 DataEvent 计算、ExpressionEvent 编译、OperatorIdEvent 注册
    • 支持 FlushNotifyEvent 处理,参与分布式同步
  • Aviator 集成
    • 使用 AviatorEvaluator.compile() 编译表达式
    • 支持变量映射:f1、f2、f3 对应 DataEvent 的字段
    • 支持数学函数、条件判断等复杂表达式
  • 位置:core/operator/CalculatorOperator.java
ExpressionOperatorFactory (操作符工厂)
public class ExpressionOperatorFactory extends SimpleOperatorFactory<Event>implements CoordinatedOperatorFactory<Event>
  • 主要功能
    • 创建和配置 ExpressionOperator
    • 提供协调器集成能力,连接操作符和协调器
    • 管理操作符生命周期
  • 关键方法
    • getCoordinatorProvider(): 提供协调器提供者
    • 集成 Flink 的协调器机制,实现分布式协调
  • 位置:core/factory/ExpressionOperatorFactory.java

4. 协调器层

ExpressionCoordinator (分布式协调器)
public class ExpressionCoordinator implements OperatorCoordinator, CoordinationRequestHandler
  • 主要功能
    • 管理计算任务的注册状态
    • 协调表达式变更过程中的同步操作
    • 处理检查点和故障恢复
  • 关键方法
    • handleEventFromOperator(): 处理来自操作符的事件
    • handleCoordinationRequest(): 处理协调请求
    • checkpointCoordinator(): 检查点状态持久化
  • 位置:coordination/coordinator/ExpressionCoordinator.java
协调请求和响应
  • ExpressionChangeRequest: 表达式变更请求
  • ReleaseBlockRequest: 释放阻塞请求
  • ExpressionCoordinationResponse: 协调响应
  • 位置:coordination/request/coordination/response/

时序图

以下时序图展示了系统中各组件之间的交互流程:
在这里插入图片描述

时序图说明

时序图展示了系统中各组件之间的完整交互流程,包含四个主要阶段:

1. 系统启动阶段 (System Initialization)

目标: 建立操作符间的通信链路和协调器注册

详细流程:

  • TestMain 启动: 创建 StreamExecutionEnvironment,配置数据源和操作符
  • ExpressionOperator 初始化:
    • 调用 open() 方法启动操作符
    • 获取自身的 OperatorID
    • 广播 OperatorIdEvent 到所有下游 CalculatorOperator 实例
  • CalculatorOperator 注册:
    • 接收 OperatorIdEvent,获得上游 ExpressionOperator 的 ID
    • 创建 CalculatorTaskRegister 事件,包含自己的子任务 ID
    • 通过协调器网关向 ExpressionCoordinator 发送注册请求
  • ExpressionCoordinator 记录:
    • 接收并处理 CalculatorTaskRegister 事件
    • 将注册的计算任务 ID 存储在 registeredCalculatorSubTasks 映射中
    • 为后续的分布式协调做准备

关键技术点:

  • 使用 Flink 的 OperatorID 机制建立操作符间的身份识别
  • 通过事件广播实现一对多的通信模式
  • 协调器维护全局的任务注册状态
2. 数据事件处理阶段 (Data Event Processing)

目标: 处理业务数据,执行表达式计算

详细流程:

  • 数据输入: 用户通过 Socket 输入格式为 d|f1,f2,f3 的数据
  • 事件解析: TestMain 中的 map 函数将文本解析为 DataEvent 对象
  • ExpressionOperator 处理:
    • 接收 DataEvent,调用 processElement() 方法
    • 执行 partitionBy() 方法计算分区 ID:Math.abs(event.hashCode()) % parallelism
    • 创建 PartitionedEvent 包装原始事件和分区 ID
    • 发送到下游的 CalculatorOperator
  • CalculatorOperator 计算:
    • 接收 PartitionedEvent,提取原始的 DataEvent
    • 检查是否存在当前编译的表达式 (currentExpression)
    • 如果有表达式:
      • 将 DataEvent 的字段映射到变量:f1, f2, f3
      • 使用 Aviator 引擎执行表达式:currentExpression.execute(expressionDataMap)
      • 输出 Double 类型的计算结果
    • 如果无表达式:输出 “当前没有表达式,跳过计算”

关键技术点:

  • 基于哈希的分区策略确保数据均匀分布
  • Aviator 表达式引擎的变量映射和执行机制
  • 条件处理逻辑,支持有/无表达式的不同场景
3. 表达式变更流程 (Expression Change Flow)

目标: 实现分布式环境下的表达式热更新

详细流程:

阶段 3.1 - 协调请求:

  • 表达式输入: 用户输入格式为 e|expression 的表达式事件
  • ExpressionOperator 处理:
    • 接收 ExpressionEvent,提取表达式字符串
    • 创建 ExpressionChangeRequest 协调请求
    • 通过 coordinator.sendRequestToCoordinator() 发送请求
  • ExpressionCoordinator 响应:
    • 接收并处理 ExpressionChangeRequest
    • 创建 PendingExpressionChange 对象管理异步协调状态
    • 初始化 CompletableFuture<CoordinationResponse> 用于后续阻塞释放
    • 返回 ExpressionCoordinationResponse 确认请求接收

阶段 3.2 - 状态同步:

  • ExpressionOperator 广播刷新:
    • 向所有下游 CalculatorOperator 广播 FlushNotifyEvent
    • 通知所有计算任务准备状态同步
  • CalculatorOperator 响应:
    • 接收 FlushNotifyEvent,执行本地状态刷新操作
    • 创建 FlushSuccess 事件,包含自己的子任务 ID
    • 向协调器发送刷新完成通知
  • ExpressionCoordinator 计数:
    • 接收每个 FlushSuccess 事件
    • 更新 flushSuccessCount 计数器
    • 检查是否所有注册的计算任务都已完成刷新

阶段 3.3 - 阻塞释放:

  • ExpressionOperator 等待:
    • 发送 ReleaseBlockRequest 到协调器
    • 等待协调器确认所有任务同步完成
  • ExpressionCoordinator 检查:
    • 检查 flushSuccessCount 是否等于注册任务数量
    • 如果所有任务完成,调用 waitReleaseFuture.complete() 释放阻塞
    • 返回响应给 ExpressionOperator

阶段 3.4 - 表达式广播:

  • ExpressionOperator 广播新表达式:
    • 阻塞释放后,向所有下游广播原始的 ExpressionEvent
  • CalculatorOperator 编译:
    • 接收新的 ExpressionEvent
    • 使用 AviatorEvaluator.compile() 编译表达式
    • 更新本地的 currentExpression 变量
    • 准备处理后续的数据事件

关键技术点:

  • 使用 CompletableFuture 实现异步协调和阻塞控制
  • 分布式计数机制确保所有任务同步完成
  • 原子性操作保证表达式变更的一致性
4. 后续数据处理阶段 (Subsequent Data Processing)

目标: 使用新表达式处理数据,验证热更新效果

详细流程:

  • 数据输入: 用户继续输入数据事件
  • 表达式计算: CalculatorOperator 使用新编译的表达式进行计算
  • 结果输出: 输出使用新表达式计算的结果,验证热更新成功

支持的表达式类型:

  • 基本算术: f1+f2+f3, f1*f2+f3
  • 条件判断: f1>f2?f1+f2:f2+f3
  • 数学函数: math.sqrt(f1*f1+f2*f2)+f3
  • 逻辑运算: f1>10 && f2<20

关键技术点:

  • Aviator 引擎的强大表达式支持能力
  • 热更新的即时生效,无需重启系统
  • 完整的端到端数据处理验证

架构图

以下架构图展示了系统的整体结构和组件关系:
在这里插入图片描述

架构图说明

架构图展示了系统的五个核心层次:

  1. 数据输入层 (黄色): 负责接收用户输入和数据解析
  2. 事件模型层 (蓝色): 定义统一的事件接口体系,包含完整的事件类型
  3. 操作符层 (紫色): 双操作符架构,ExpressionOperator 管理,CalculatorOperator 计算
  4. 协调器层 (绿色): 分布式协调机制,管理表达式变更和状态同步
  5. 数据处理层 (粉色): 分区策略、Aviator 引擎和结果输出

各层之间通过明确的接口进行交互,实现了良好的分层解耦。双操作符架构确保了职责分离和系统的可扩展性。

关键代码示例

1. ExpressionOperator 核心处理逻辑

操作符初始化
@Override
public void open() throws Exception {super.open();// 向下游广播操作符ID,建立通信关系OperatorID expressionOperatorID = getOperatorID();broadcast(new OperatorIdEvent(expressionOperatorID));System.out.println("ExpressionOperator 启动完成,OperatorID: " + expressionOperatorID.toHexString());
}
事件处理逻辑
@Override
public void processElement(StreamRecord<HitaoEvent> element) throws Exception {HitaoEvent hitaoEvent = element.getValue();// 处理表达式变更事件if (hitaoEvent instanceof ExpressionEvent) {ExpressionEvent exp = (ExpressionEvent) hitaoEvent;String expression = exp.getExpression();System.out.println("收到表达式变更事件: " + expression);// 1. 向协调器请求表达式变更ExpressionChangeRequest expressionChangeRequest = new ExpressionChangeRequest(expression);coordinator.sendRequestToCoordinator(operatorID, new SerializedValue<>(expressionChangeRequest));// 2. 向下游广播刷新通知broadcast(new FlushNotifyEvent());// 3. 等待协调器释放阻塞releaseRequest();System.out.println("阻塞被放开,继续处理");// 4. 向下游广播新的表达式broadcast(exp);}// 所有事件都进行分区处理partitionBy(hitaoEvent);
}
数据分区策略
private void partitionBy(HitaoEvent hitaoEvent) {// 计算分区ID:使用 hashCode 取模,并确保结果为正数int partitionId = Math.abs(hitaoEvent.hashCode()) % downStreamParallelism;PartitionedEvent partitionedEvent = new PartitionedEvent(partitionId, hitaoEvent);output.collect(new StreamRecord<>(partitionedEvent));
}private void broadcast(HitaoEvent hitaoEvent) {// 广播到所有下游分区for (int i = 0; i < downStreamParallelism; i++) {PartitionedEvent partitionedEvent = new PartitionedEvent(i, hitaoEvent);output.collect(new StreamRecord<>(partitionedEvent));}
}

2. CalculatorOperator 计算逻辑

事件处理和表达式计算
@Override
public void processElement(StreamRecord<HitaoEvent> element) throws Exception {PartitionedEvent partitionedEvent = (PartitionedEvent) element.getValue();HitaoEvent event = partitionedEvent.getEvent();if (event instanceof DataEvent) {DataEvent dataEvent = (DataEvent) event;// 使用 Aviator 引擎执行表达式计算if (currentExpression != null) {expressionDataMap.put("f1", (double) dataEvent.getF1());expressionDataMap.put("f2", (double) dataEvent.getF2());expressionDataMap.put("f3", (double) dataEvent.getF3());double result = (double) currentExpression.execute(expressionDataMap);output.collect(new StreamRecord<>(result));System.out.println("计算结果: f1=" + dataEvent.getF1() +", f2=" + dataEvent.getF2() +", f3=" + dataEvent.getF3() +" => " + result);}} else if (event instanceof OperatorIdEvent) {// 接收操作符ID并向协调器注册OperatorIdEvent operatorIdEvent = (OperatorIdEvent) event;this.expressionOperatorID = operatorIdEvent.getOperatorID();CalculatorTaskRegister register = new CalculatorTaskRegister(selfSubTaskId);coordinator.sendOperatorEventToCoordinator(expressionOperatorID, new SerializedValue<>(register));} else if (event instanceof ExpressionEvent) {// 编译新的表达式ExpressionEvent expressionEvent = (ExpressionEvent) event;this.currentExpression = AviatorEvaluator.compile(expressionEvent.getExpression());System.out.println("表达式编译完成: " + expressionEvent.getExpression());} else if (event instanceof FlushNotifyEvent) {// 发送刷新完成通知coordinator.sendOperatorEventToCoordinator(expressionOperatorID,new SerializedValue<>(new FlushSuccess(selfSubTaskId)));}
}

3. ExpressionCoordinator 协调逻辑

协调请求处理
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {if (request instanceof ExpressionChangeRequest) {CompletableFuture<CoordinationResponse> waitReleaseFuture = new CompletableFuture<>();System.out.println("收到 expression change request");// 创建待处理的表达式变更pendingExpressionChange = new PendingExpressionChange(registeredCalculatorSubTasks, waitReleaseFuture);return CompletableFuture.completedFuture(new ExpressionCoordinationResponse());} else if (request instanceof ReleaseBlockRequest) {System.out.println("收到 release block request");// 返回等待所有任务完成的 Futurereturn pendingExpressionChange.waitReleaseFuture;}throw new IllegalArgumentException("请求类型不支持: " + request.getClass());
}
操作符事件处理
@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {if (event instanceof FlushSuccess) {FlushSuccess success = (FlushSuccess) event;int subtaskId = success.getSubtaskId();System.out.println("收到一个 flush success 事件,子任务ID: " + subtaskId);if (pendingExpressionChange != null) {pendingExpressionChange.flushSuccess(subtaskId);}} else if (event instanceof CalculatorTaskRegister) {CalculatorTaskRegister register = (CalculatorTaskRegister) event;int subTaskId = register.getSubTaskId();System.out.println("收到一个calculator sub task注册事件: " + subTaskId);registeredCalculatorSubTasks.put(subTaskId, "");}
}

4. 完整的流处理管道

TestMain 中的管道构建
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 创建数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 6666);// 2. 解析文本为事件对象SingleOutputStreamOperator<Event> events = stream.map(s -> {String[] split = s.split("\\|");if (split.length < 2) {throw new IllegalArgumentException("Invalid input format: " + s +". Expected format: 'd|10,20,30' or 'e|f1-f2*f3'");}if ("d".equals(split[0])) {// 解析数据事件: d|10,20,30String[] fields = split[1].split(",");if (fields.length < 3) {throw new IllegalArgumentException("Invalid data format: " + s +". Expected format: 'd|field1,field2,field3'");}return new DataEvent(Integer.parseInt(fields[0]),Integer.parseInt(fields[1]),Integer.parseInt(fields[2]));} else {// 解析表达式事件: e|f1+f2*f3return new ExpressionEvent(split[1]);}});// 3. 设置下游计算任务的并行度int calculatorTaskParallelism = 2;// 4. 添加表达式管理操作符SingleOutputStreamOperator<Event> partitionedEvents = events.transform("expression management operator",TypeInformation.of(Event.class),new ExpressionOperatorFactory(new ExpressionOperator(calculatorTaskParallelism)));// 5. 配置自定义分区策略DataStream<Event> partitionedStream = partitionedEvents.partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer key, int numPartitions) {return key % numPartitions;}},new KeySelector<Event, Integer>() {@Overridepublic Integer getKey(Event value) throws Exception {PartitionedEvent event = (PartitionedEvent) value;return event.getPartitionId();}});// 6. 添加表达式计算操作符SingleOutputStreamOperator<Double> calculatorOperator = partitionedStream.transform("calculator operator",TypeInformation.of(Double.class),new CalculatorOperator()).setParallelism(calculatorTaskParallelism);// 7. 输出计算结果calculatorOperator.print().setParallelism(calculatorTaskParallelism);// 8. 启动 Flink 作业执行env.execute("Flink Expression Processing System");
}

完整测试场景

完整的端到端测试流程

测试环境准备
# 1. 确保 Java 环境# 2. 编译项目
cd flink-release-1.18
mvn clean compile -pl flink-examples/flink-examples-streaming# 3. 启动 Flink 作业(在一个终端)
idea 中启动# 4. 启动数据输入(在另一个终端)
nc -l 6666
完整测试序列

步骤1: 验证系统启动

# 预期日志输出
ExpressionOperator 启动完成,OperatorID: xxx
CalculatorOperator 启动完成,子任务ID: 0
CalculatorOperator 启动完成,子任务ID: 1
收到 expression 管理算子的 算子ID: xxx
向协调器注册完成,子任务ID: 0
向协调器注册完成,子任务ID: 1

步骤2: 测试无表达式的数据处理

# 输入数据
d|10,20,30# 预期输出
当前没有表达式,跳过计算
# 系统正常处理但不输出计算结果

步骤3: 设置表达式并测试计算

# 输入表达式
e|f1+f2+f3# 预期日志
收到表达式变更事件: f1+f2+f3
收到 expression change request
发送 FlushSuccess 事件,子任务ID: 0
发送 FlushSuccess 事件,子任务ID: 1
所有 subtask flush 都完成
阻塞被放开,继续处理
收到新的表达式: f1+f2+f3
表达式编译完成: f1+f2+f3# 输入数据
d|10,20,30# 预期输出
计算结果: f1=10, f2=20, f3=30 => 60.0
60.0

步骤4: 动态表达式更新测试

# 更新表达式
e|f1*f2+f3# 输入数据
d|10,20,30# 预期输出
计算结果: f1=10, f2=20, f3=30 => 230.0
230.0

步骤5: 复杂表达式测试

# 条件表达式
e|f1>f2?f1+f2:f2+f3# 测试数据1
d|30,20,10
# 输出: 50.0 (30>20为真,所以30+20)# 测试数据2
d|10,20,30
# 输出: 50.0 (10>20为假,所以20+30)# 数学函数表达式
e|math.sqrt(f1*f1+f2*f2)+f3# 测试数据
d|3,4,5
# 输出: 10.0 (sqrt(9+16)+5 = 5+5)

步骤6: 并行处理验证

# 快速输入多个数据,验证并行处理
d|1,2,3
d|4,5,6
d|7,8,9# 预期输出(可能乱序,因为并行处理)
计算结果: f1=1, f2=2, f3=3 => 6.0
计算结果: f1=4, f2=5, f3=6 => 15.0
计算结果: f1=7, f2=8, f3=9 => 24.0
6.0
15.0
24.0

适用场景

虽然是个demo,但是可以帮助我们更深入的理解flink的源码,为构建灵活、可扩展、高性能的实时数据处理系统提供了完整的技术参考和实现方案。可以扩展应用到下面场景

  • 实时规则引擎: 需要动态更新处理规则的场景,如业务规则变更
  • 流式计算平台: 需要灵活配置计算逻辑的数据处理系统
  • 监控告警系统: 需要实时调整告警规则的监控平台
  • 推荐系统: 需要动态调整推荐算法的实时推荐场景
  • 金融风控: 需要实时调整风控规则的金融系统
  • IoT数据处理: 需要动态调整传感器数据处理逻辑的物联网系统
  • 实时数据分析: 需要动态调整分析指标和计算公式的场景
  • A/B测试平台: 需要实时切换不同算法版本的实验平台

返回目录

Flink 源码系列 - 前言

http://www.xdnf.cn/news/1326331.html

相关文章:

  • 低延迟、跨平台与可控性:直播SDK的模块化价值解析
  • 基于 PyTorch 模型训练优化、FastAPI 跨域配置与 Vue 响应式交互的手写数字识别
  • 微服务架构的演进:从 Spring Cloud Netflix 到云原生新生态
  • 如何在 uBlock Origin 中忽略指定网站的某一个拦截规则
  • 数字孪生 :提高制造生产力的智能方法
  • 当宠物机器人装上「第六感」:Deepoc 具身智能如何重构宠物机器人照看逻辑
  • 常见的软件图片缩放,算法如何选择?
  • 当机器猫遇上具身智能:一款能读懂宠物心思的AI守护者
  • ISIS高级特性
  • 驱动开发系列66 - glCompileShader实现 - GLSL中添加内置函数
  • MySQL练习题50题(附带详细教程)
  • 【GNSS定位原理及算法杂记5】​​​​PPK(后处理动态定位)深度解析:后处理的艺术与 RTK 的互补
  • ListBoxes使得在专为灵活性和易用性设计
  • GaussDB 中 alter default privileges 的使用示例
  • 从数据表到退磁:Ansys Maxwell中N48磁体磁化指南
  • GaussDB 八种常规锁介绍
  • MoonBit Perals Vol.06: Moonbit 与 LLVM 共舞 (上):编译前端实现
  • CloudDM 新增支持 GaussDB 与 openGauss:国产数据库管理更高效
  • wx小游戏canvas能力封装
  • 推理还是训练 || KV缓存和CoT技术
  • 云原生堡垒机渗透测试场景
  • 应急响应常见命令
  • 代码随想录刷题——字符串篇(七)
  • ChatBI驱动的智能商业决策:奥威BI的深度实践
  • Java多线程:线程创建、安全、同步与线程池
  • 常见的 Bash 命令及简单脚本
  • C语言实战:从零开始编写一个通用配置文件解析器
  • SpringAI——向量存储(vector store)
  • 电子电气架构 --- 软件项目成本估算
  • UE5 PCG 笔记(一)