当前位置: 首页 > news >正文

详解flink性能优化

1. 简介

Apache Flink是一个强大的流处理框架,其性能很大程度上取决于内存的使用效率。在大规模数据处理场景中,合理的内存配置和优化可以显著提升Flink作业的性能和稳定性。本文将深入探讨Flink内存优化的各个方面,包括状态后端选择、内存配置参数、分布式状态管理等。

2. Flink 状态管理与内存

2.1 状态后端选择

Flink提供了多种状态后端,每种都有不同的内存使用特性:

  1. HashMapStateBackend:将状态数据作为Java对象存储在JVM堆内存中

    • 优点:访问速度快(内存级别)
    • 缺点:受集群可用内存限制
    • 适用场景:对性能要求高但状态大小适中的作业
  2. EmbeddedRocksDBStateBackend:将状态存储在TaskManager本地磁盘的RocksDB数据库中

    • 优点:状态大小仅受磁盘空间限制
    • 缺点:相比HashMapStateBackend吞吐量较低
    • 适用场景:状态非常大,需要增量检查点的场景
  3. ForStStateBackend(实验性):基于ForSt项目的分布式状态管理,允许状态存储在远程文件系统上

    • 优点:状态可以存储在远程文件系统(HDFS、S3等),超越本地磁盘容量限制
    • 缺点:仍处于实验阶段,不完全生产就绪
    • 适用场景:超大规模状态,云原生设置

2.2 HashMapStateBackend 内存优化

HashMapStateBackend将所有状态保存在JVM堆内存中,因此优化主要集中在JVM内存管理上:

  • 合理设置TaskManager的堆内存大小

  • 调整JVM垃圾回收参数

  • 避免对象频繁创建和销毁

  • 考虑使用堆外内存减轻GC压力

2.3 RocksDBStateBackend 内存优化

RocksDB状态后端的内存使用更为复杂,Flink提供了多种配置选项来控制其内存使用:

内存管理模式

  • 默认情况下,RocksDB内存配置与Flink的每槽位托管内存匹配
  • 内存在写入路径(MemTable)和读取路径(索引、过滤器、缓存)之间分配

关键内存参数

  • state.backend.rocksdb.memory.write-buffer-ratio:写缓冲区占比(默认:0.5)

  • state.backend.rocksdb.memory.high-prio-pool-ratio:高优先级池占比(默认:0.1)

写入缓冲区配置

  • state.backend.rocksdb.writebuffer.size:内存中构建的数据量(默认:64MB)

  • state.backend.rocksdb.writebuffer.count:内存中构建的最大写缓冲区数量(默认:2)

批量写入优化

  • state.backend.rocksdb.write-batch-size:RocksDB批量写入的最大内存消耗(默认:2MB)

2.4 ForStStateBackend 内存优化

ForSt状态后端是Flink 2.0引入的实验性功能,用于分布式状态管理。它提供了类似RocksDB的内存配置选项,但针对分布式存储进行了优化:

内存管理模式

  • state.backend.forst.memory.managed:是否使用托管内存(默认:true)
  • state.backend.forst.memory.fixed-per-slot:每个槽位的固定内存大小(覆盖托管内存选项)
  • state.backend.forst.memory.fixed-per-tm:每个TaskManager的固定内存大小(集群级别选项)

内存分配比例

  • ate.backend.forst.memory.write-buffer-ratio:写缓冲区占比(默认:0.5)

  • state.backend.forst.memory.high-prio-pool-ratio:高优先级池占比(默认:0.1)

缓存配置

  • state.backend.forst.cache.lru.promote-limit**:LRU缓存提升限制(默认:3)

  • state.backend.forst.block.cache-size:数据块缓存大小(默认:8MB)

3. 内存配置参数详解

3.1 RocksDB 内存参数

RocksDB的内存使用主要分为以下几个部分:

  1. 写入路径内存
  • 写缓冲区(MemTable):用于临时存储写入的数据

  • 配置参数:state.backend.rocksdb.writebuffer.sizestate.backend.rocksdb.writebuffer.count

  1. 读取路径内存
  • 缓存:用于缓存数据块

  • 索引和过滤器:用于加速查询

  • 配置参数:state.backend.rocksdb.memory.high-prio-pool-ratio

  1. 其他内存参数
  • state.backend.rocksdb.thread.num**:并发后台刷新和压缩作业的最大数量(默认:2)

  • state.backend.rocksdb.files.open:DB可以使用的最大打开文件数(默认:-1,表示无限制)

3.2 ForState 内存参数

ForState状态后端的内存配置与RocksDB类似,但增加了一些针对分布式存储的特定参数:

  1. 基本内存配置
  • state.backend.forst.memory.managed:是否使用托管内存

  • state.backend.forst.memory.fixed-per-slot:每个槽位的固定内存大小

  • state.backend.forst.memory.fixed-per-tm:每个TaskManager的固定内存大小

    2 . 内存分配比例

  • state.backend.forst.memory.write-buffer-ratio:写缓冲区占比

  • state.backend.forst.memory.high-prio-pool-ratio:高优先级池占比

  1. 索引和过滤器配置
  • state.backend.forst.memory.partitioned-index-filters:是否使用分区索引/过滤器(默认:true)

  • state.backend.forst.use-bloom-filter:是否为新创建的SST文件使用布隆过滤器(默认:false)

  • state.backend.forst.bloom-filter.bits-per-key:布隆过滤器每个键使用的位数(默认:10.0)

  1. 执行器配置
  • state.backend.forst.executor.inline-coordinator:是否让任务线程作为协调线程(默认:false)
  • state.backend.forst.executor.inline-write:是否在协调线程内执行写请求(默认:true)

4. 分布式状态管理与内存优化

Flink 2.0引入的分布式状态管理(Disaggregated State Management)是一项重要的内存优化技术,特别适用于超大规模状态场景:

  1. 分布式状态管理的优势

    • 无限状态大小:状态大小仅受外部存储系统限制
    • 稳定资源使用:状态存储在外部存储中,检查点操作非常轻量级
    • 快速恢复:恢复时无需下载状态,恢复时间与状态大小无关
    • 灵活性:可以根据需求轻松选择不同的外部存储系统或I/O性能级别
    • 成本效益:外部存储通常比本地磁盘更便宜,可以独立调整计算资源和存储资源
  2. 分布式状态管理的组成部分

    • ForSt 状态后端:将状态存储在外部存储系统中,也可以利用本地磁盘进行缓存和缓冲
    • 新状态 API:引入异步状态读写的新状态API(State V2),对于克服访问分布式状态时的高网络延迟至关重要
    • SQL 支持:许多SQL算子已重写以支持分布式状态管理和异步状态访问
  3. 分布式状态管理的配置

    • 默认情况下,ForSt状态后端将状态存储在检查点目录中,这样可以实现轻量级检查点和快速恢复

    • 可以通过配置state.backend.forst.primary-dir指定不同的主存储位置

    • ForSt使用本地磁盘进行缓存和缓冲,缓存粒度为整个文件

  4. 文件缓存策略

    • 基于大小的限制:当缓存大小超过限制时,会驱逐最旧的文件
    • 基于保留空间的限制:当磁盘上的保留空间不足时,会驱逐最旧的文件
    • 相关配置:
    • state.backend.forst.cache.size-based-limit: 1GB
    • state.backend.forst.cache.reserve-size: 10GB
  5. 同步与异步状态访问

    • 默认情况下,ForSt仅在使用异步API(State V2)时才会分散状态
    • 使用同步状态API时,ForSt默认仅作为本地状态存储
    • 可以通过配置state.backend.forst.sync.enforce-local: false让同步API的操作也存储在远程

5. 内存优化最佳实践

5.1 内存分配策略

  1. 托管内存与固定内存
  • 对于RocksDB和ForSt状态后端,建议使用托管内存模式(默认开启)

  • 托管内存模式下,状态后端会自动配置自身使用任务槽的托管内存预算

  • 如果需要更精细的控制,可以使用固定内存模式:

    • state.backend.forst.memory.fixed-per-slot:每个槽位固定内存

    • state.backend.forst.memory.fixed-per-tm:每个TaskManager固定内存

  1. 内存比例分配

    • 写缓冲区与缓存内存的比例分配对性能影响很大

    • 默认配置:写缓冲区占50%,缓存内存占50%(其中高优先级池占缓存内存的10%)

    • 读密集型作业可以增加缓存内存比例

    • 写密集型作业可以增加写缓冲区比例

  2. 内存参数验证

  • 统会验证内存参数的合法性,例如写缓冲区比例和高优先级池比例之和不能超过1.0

  • 非法的内存配置会导致异常

5.2 缓存优化

块缓存配置:

  • 缓存用于存储数据块,对读取性能影响很大

    • 默认块缓存大小为8MB,可以根据需要调整
    • 配置参数:state.backend.forst.block.cache-size

    LRU 缓存策略优化

  • ForSt使用LRU(最近最少使用)策略管理缓存

    • 可以通过state.backend.forst.cache.lru.promote-limit配置热链接块的提升限制
    • 默认值为3,表示当热链接中的块被移动到冷链接的次数达到3次时,该块将被阻止提升到LRU列表的头部

    布隆过滤器优化

  • 布隆过滤器可以加速键值查找,减少不必要的磁盘访问

    • 默认情况下布隆过滤器是禁用的,可以通过state.backend.forst.use-bloom-filter启用
    • 启用后,可以通过state.backend.forst.bloom-filter.bits-per-key配置每个键使用的位数(默认10.0)

    分区索引和过滤器

  • 启用分区索引和过滤器可以减少内存使用并提高查询效率

    • 默认情况下已启用(state.backend.forst.memory.partitioned-index-filters为true)
    • 分区索引将SST文件的索引/过滤器块分割成更小的块,并在它们上添加一个顶层索引
    • 读取时,只有顶层索引被加载到内存中,按需加载所需的分区

5.3 Checkpoint 与内存优化

增量检查点

  • 增量检查点只存储自上次检查点以来的状态变化,而不是完整状态

  • 对于大状态作业,显著减少检查点完成时间

  • 配置方法:execution.checkpointing.incremental: true

  • RocksDB和ForSt状态后端都支持增量检查点

执行器线程配置

  • ForSt状态后端提供了执行器线程配置选项,可以优化内存使用和性能

  • state.backend.forst.executor.inline-coordinator:是否让任务线程作为协调线程(默认false)

  • state.backend.forst.executor.inline-write:是否在协调线程内执行写请求(默认true)

写入批处理优化

  • 批量写入可以减少I/O操作,提高写入性能

  • RocksDB配置参数:state.backend.rocksdb.write-batch-size(默认2MB)

  • ForSt配置参数:state.backend.forst.write-batch-size(默认2MB)

6. 总结

Flink内存优化是提高作业性能和稳定性的关键。通过选择合适的状态后端、调整内存配置参数、优化缓存策略等方法,可以显著提升Flink作业的性能。对于不同规模和特性的作业,应采用不同的优化策略:

  • 小规模状态:使用HashMapStateBackend,关注JVM内存优化
  • 中等规模状态:使用EmbeddedRocksDBStateBackend,优化RocksDB内存参数
  • 超大规模状态:使用ForStStateBackend,结合异步状态API和分布式存储
http://www.xdnf.cn/news/1481455.html

相关文章:

  • docker使用nginxWebUI配置
  • OSG工具集
  • Python错误测试与调试——文档测试
  • ElemenetUI之常用小组件
  • Elasticsearch面试精讲 Day 10:搜索建议与自动补全
  • GEE:基于自定义的年度时序数据集进行LandTrendr变化检测
  • Qt UDP通信学习
  • 《sklearn机器学习——模型的持久性》joblib 和 pickle 进行模型保存和加载
  • python的数据结构
  • redission实现读写锁的原理
  • TDengine 时间函数 WEEKDAY() 用户手册
  • 【PCIe EP 设备入门学习专栏 -- 8 PCIe EP 架构详细介绍】
  • dask.dataframe.shuffle.set_index中获取 divisions 的步骤分析
  • 单例模式(巨通俗易懂)普通单例,懒汉单例的实现和区别,依赖注入......
  • 【C++题解】DFS和BFS
  • leetcode 75 颜色分类
  • OS项目构建效能改进策划方案
  • 神马 M60S++ 238T矿机参数解析:高效SHA-256算法比拼
  • Docker加速下载镜像的配置指南
  • 计算机网络:物理层---数据通信基础知识
  • 【C++ 11 模板类】tuple 元组
  • 嵌入式笔记系列——UART:TTL-UART、RS-232、RS-422、RS-485
  • 旧电脑改造linux服务器2:安装系统
  • 软考中级习题与解答——第二章_程序语言与语言处理程序(3)
  • AD渗透中服务账号相关攻击手法总结(Kerberoasting、委派)
  • 数据仓库概要
  • 【selenium】网页元素找不到?从$(‘[placeholder=“手机号“]‘)说起
  • PyQt5 入门(上):开启 GUI 编程之旅
  • python advance -----object-oriented
  • URI与URL区别:资源ID和地址差异