当前位置: 首页 > backend >正文

Flink 网络消息队列 PrioritizedDeque

PrioritizedDeque 是一个支持元素优先级双端队列(Deque)

它的核心应用场景是在 SingleInputGate 中,用于调度有可用数据的 InputChannel。在 Flink 的数据流中,除了普通的业务数据(Buffer),还存在一些高优先级的控制事件,最典型的就是 CheckpointBarrier。为了保证 Checkpoint 机制的低延迟和正确性,CheckpointBarrier 必须被尽快处理,不能被大量的普通数据阻塞。

PrioritizedDeque 就是为了解决这个问题而设计的。它允许将某些元素(即包含高优先级事件的 InputChannel)标记为 “优先” ,这些优先元素会被排在队列的前面,从而被 SingleInputGate 优先取出和处理。

@Internal
public final class PrioritizedDeque<T> implements Iterable<T> {private final Deque<T> deque = new ArrayDeque<>();private int numPriorityElements;// ...
}
  • @Internal: 表明这是 Flink 内部使用的类,不属于公共 API,可能会在未来版本中发生变化。
  • 内部实现: 它内部包装了一个标准的 java.util.ArrayDeque 来存储所有元素,并通过一个整型变量 numPriorityElements 来记录当前队列头部有多少个是优先元素。

数据结构与核心思想

PrioritizedDeque 的巧妙之处在于它没有使用复杂的堆或者多个队列来实现优先级,而是利用了 ArrayDeque 的特性,在一个队列中维护了两个逻辑分区:

              <-- poll() from here
+------------------------------------+------------------------------------+
| P1 | P2 | ... | Pn                  | E1 | E2 | ... | Em                  |
+------------------------------------+------------------------------------+
|      Priority Elements             |      Non-Priority Elements         |
|      (numPriorityElements 个)      |                                    |
+------------------------------------+------------------------------------+addFirst() for priority      addLast() for non-priority
  • 优先元素区: 位于队列的头部。所有被标记为优先的元素都在这里。
  • 普通元素区: 位于队列的尾部。
  • numPriorityElements: 这个计数器是区分这两个区域的边界

当调用 poll() 方法时,它总是从 deque 的头部取元素。由于优先元素被放在头部,它们自然会被先取走。

添加元素

  • public void add(T element): 添加一个普通(非优先) 元素。

    public void add(T element) {deque.add(element); // 等价于 deque.addLast(element)
    }
    

    这个操作非常高效,直接在底层 ArrayDeque 的尾部添加元素,时间复杂度为 O(1)。

  • public void addPriorityElement(T element): 添加一个优先元素。

    public void addPriorityElement(T element) {// 优先元素很少,这是最常见的优化路径if (numPriorityElements == 0) {deque.addFirst(element);} else if (numPriorityElements == deque.size()) {// 队列里全是优先元素deque.add(element); // 直接加到队尾即可} else {// 这是最坏的情况:队列中既有优先元素,又有普通元素// 1. 把所有现有的优先元素(头部n个)取出来,暂存final ArrayDeque<T> priorPriority = new ArrayDeque<>(numPriorityElements);for (int index = 0; index < numPriorityElements; index++) {priorPriority.addFirst(deque.poll());}// 2. 把新元素加到队头deque.addFirst(element);// 3. 把暂存的旧优先元素再按原顺序加回到队头for (final T priorityEvent : priorPriority) {deque.addFirst(priorityEvent);}}numPriorityElements++;
    }
    

    这个方法的实现体现了其设计哲学:为最常见的场景优化

    • 最佳情况 (O(1)): 当队列中没有优先元素时,直接 addFirst。这是 Flink 运行时期望的常态。
    • 最坏情况 (O(N_priority)): 当队列中已经存在优先元素时,需要先把它们全部移出,再把新元素和旧元素一起放回去。这个操作的成本与已有优先元素的数量成正比。设计者认为优先元素是稀少且短暂存在的,所以这个开销可以接受。

提升优先级

  • public void prioritize(T element): 将一个已经存在于队列中的元素提升为优先元素。
    public void prioritize(T element) {final Iterator<T> iterator = deque.iterator();// 1. 检查是否已经是优先元素,如果是,直接返回for (int i = 0; i < numPriorityElements && iterator.hasNext(); i++) {if (iterator.next() == element) {return;}}// 2. 优化:如果它正好是第一个非优先元素,只需增加计数器即可if (iterator.hasNext() && iterator.next() == element) {numPriorityElements++;return;}// 3. 常规路径:从队列中找到并移除它while (iterator.hasNext()) {if (iterator.next() == element) {iterator.remove();break;}}// 4. 作为优先元素重新添加addPriorityElement(element);
    }
    
    这个方法用在当一个普通的 InputChannel 突然收到了一个 CheckpointBarrier 时。SingleInputGate 需要将这个 Channel 的优先级提升,确保它能被优先处理。这个方法的实现同样包含了对常见情况的优化。

取出元素

  • public T poll(): 从队列头部取出一个元素。

    @Nullable
    public T poll() {final T polled = deque.poll(); // 等价于 deque.pollFirst()if (polled != null && numPriorityElements > 0) {numPriorityElements--;}return polled;
    }
    

    这个操作非常简单直接。它总是从 deque 的头部取元素。如果取出的元素是一个优先元素(即 numPriorityElements > 0),则将计数器减一。这个操作的时间复杂度是 O(1)。

  • public T peek(): 查看队头元素但不移除。

    @Nullable
    public T peek() {return deque.peek(); // 等价于 deque.peekFirst()
    }
    

    同样是 O(1) 操作。

总结

PrioritizedDeque 是一个为特定场景(Flink 网络输入调度)高度优化的数据结构。

  • 设计目标: 在一个队列中同时管理优先和非优先元素,并保证优先元素总是被先处理。
  • 核心思想: 使用单个 ArrayDeque 和一个计数器 numPriorityElements 来划分出两个逻辑区域,避免了管理多个队列的复杂性。
  • 性能特点:
    • 非优先元素的操作(添加、轮询)非常高效,均为 O(1)。
    • 优先元素的操作在大多数情况下(没有其他优先元素时)也是 O(1)。
    • 在最坏情况下(需要移动多个已存在的优先元素),性能会下降,但这种场景被认为是稀有的。
  • 应用场景: 它是 SingleInputGate 实现 CheckpointBarrier 等控制事件优先处理的关键。当一个 InputChannel 收到 CheckpointBarrier 时,SingleInputGate 会调用 prioritize 或 addPriorityElement 将该 Channel 放入 PrioritizedDeque 的优先区,从而保证 Task 能及时响应 Checkpoint,避免因数据积压导致 Checkpoint 超时。

总而言之,PrioritizedDeque 是 Flink 系统设计中“为常见路径优化”思想的一个绝佳范例。

http://www.xdnf.cn/news/19919.html

相关文章:

  • C52单片机独立按键模块,中断系统,定时器计数器以及蜂鸣器
  • OpenLayers常用控件 -- 章节三:鼠标位置坐标显示控件教程
  • 多线程入门到精通系列: 从操作系统到 Java 线程模型
  • 快鹭云业财一体化系统技术解析:低代码+AI如何破解数据孤岛难题
  • 飞算JavaAI开发在线图书借阅平台全记录:从0到1的实践指南
  • 【C++】详解形参和实参:别再傻傻分不清
  • Android adb shell命令分析应用内存占用
  • 2025全国大学生数学建模C题保姆级思路模型(持续更新):NIPT 的时点选择与胎儿的异常判定
  • Trae + MCP : 一键生成专业封面——从概念到落地的全链路实战
  • java对接物联网设备(一)——使用okhttp网络工具框架对接标准API接口
  • SVN和Git两种版本管理系统对比
  • Hunyuan-MT-7B模型介绍
  • 使用Vue.js和WebSocket打造实时库存仪表盘
  • window使用ffmep工具,加自定义脚本执行视频转码成h264(运营人员使用)
  • P13929 [蓝桥杯 2022 省 Java B] 山 题解
  • 第三方网站测评:【WEB应用文件包含漏洞(LFI/RFI)的测试步骤】
  • 神经网络模型介绍
  • LeetCode 3132.找出与数组相加的整数2
  • 机器学习算法在Backtrader策略稳定性中的作用分析
  • pytorch可视化工具(训练评估:Tensorboard、swanlab)
  • c#编写的应用程序调用不在同一文件夹下的DLL
  • OpenLayers 入门篇教程 -- 章节三 :掌控地图的视野和交互
  • 下一代自动驾驶汽车系统XIL验证方法
  • 【Doris入门】Doris数据表模型使用指南:核心注意事项与实践
  • select, poll, epoll
  • PyTorch 损失函数与优化器全面指南:从理论到实践
  • 论文理解:Reflexion: Language Agents with Verbal Reinforcement Learning
  • 【正则表达式】 正则表达式运算法优先级的先后是怎么排序的?
  • 【Pytest】解决Pytest中Teardown钩子的TypeError:实例方法与类方法的调用差异
  • Java中最常用的设计模式