当前位置: 首页 > java >正文

Flink Slot 不足导致任务Pending修复方案

当前有3个虚拟机节点,每个节点配置的slot节点数量是4,${FLINK_HOME}/conf/flink-conf.yaml 关于slot的配置如下:

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4state.backend: rocksdb
# checkpoint的时间根据集群性能和数据量确定
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://cdh01:8020/chkpoints
state.backend.incremental: true

其中,${HADOOP_HOME}/etc/hadoop/yarn-site.xml中关于yarn单个容器允许分配的最大内存设置如下:

	<!-- yarn单个容器允许分配的最大最小内存 --><property><name>yarn.scheduler.minimum-allocation-mb</name><value>512</value></property><property><name>yarn.scheduler.maximum-allocation-mb</name><value>4096</value></property><!-- yarn容器允许管理的物理内存大小 --><property><name>yarn.nodemanager.resource.memory-mb</name><value>4096</value></property>

启动yarn-session以Flink session cluster供FLINK SQL以共享方式运行:
$FLINK_HOME/bin/yarn-session.sh -d

启动FLINK SQL Client:
$FLINK_HOME/bin/sql-client.sh embedded -i $FLINK_HOME/conf/sql_client_init.sql -s yarn-session

如图,提交23个 FlinkSQL任务(flink-cdc -> hudi-ods),由于slot数量不够,将导致12个任务之外的剩余任务处于pending的状态(黑紫任务)。
在这里插入图片描述

查看某个pending任务的log,看到报错如下:

2025-xx-xx 21:45:56
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:517)at akka.actor.Actor.aroundReceive$(Actor.scala:515)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)... 26 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms... 27 more

从报错来看,核心问题是 Flink集群总Slot(12个)不足以支撑23个任务的并发需求,导致11个任务因无法分配Slot而处于Pending状态,最终触发超时(300秒后仍未获取到资源)。解决思路围绕“增加可用Slot”和“优化资源分配”展开,具体方案如下:

一、核心原因拆解

Flink的Slot是任务执行的最小资源单元(每个Slot对应一个TaskManager的一个CPU核心,默认1个Slot/核心),当前12个Slot最多同时运行12个任务。23个任务并发时,超出的11个任务会进入Pending队列,若超过slot.request.timeout(默认300秒)仍未获取到Slot,就会报超时错误。

二、解决方案(按优先级排序)

方案1:增加TaskManager节点或每个节点的Slot数量(最直接)

通过扩展Flink集群的Slot总数,满足23个任务的并发需求(需至少23个Slot)。

1.1 增加每个TaskManager的Slot数量(无需新增虚拟机)

若现有3台虚拟机的CPU资源充足(如每台虚拟机有10核CPU,当前仅用4核),可通过修改配置增加单节点的Slot数:

  1. 停止Flink集群:
    ./bin/stop-cluster.sh
    
  2. 修改flink-conf.yaml配置文件:
    # 每个TaskManager的Slot数量(默认1,根据CPU核数调整,建议不超过物理核数)
    taskmanager.numberOfTaskSlots: 10  # 3台虚拟机 × 10个Slot = 30个Slot,满足23个任务
    

    注意:numberOfTaskSlots不能超过虚拟机的物理CPU核心数(如虚拟机是10核,最大设为10,留1核给系统)。

  3. 重启Flink集群:
    ./bin/start-cluster.sh
    
  4. 验证Slot总数:
    • 访问Flink Web UI(如http://hadoop103:8081),在左侧“Cluster Overview”中查看“Total Slots”是否为30(3×10)。
1.2 新增TaskManager节点(现有节点资源不足时)

若3台虚拟机的CPU已达上限,需新增1台虚拟机作为TaskManager节点:

  1. 在新虚拟机上安装Flink(与现有集群版本一致),并配置flink-conf.yaml
    # 指向现有JobManager的地址(如hadoop103)
    jobmanager.rpc.address: hadoop103
    # 新节点的Slot数量(如8个,3台旧节点×4 + 1台新节点×8 = 20,仍不够可设为11)
    taskmanager.numberOfTaskSlots: 11
    
  2. 在新节点上启动TaskManager:
    ./bin/taskmanager.sh start
    
  3. 验证:Web UI中“Total Slots”是否达到23+。
方案2:优化任务资源分配(避免资源浪费)

若无法增加Slot总数,可通过优化任务的资源配置,减少单个任务的Slot占用,提升Slot利用率。

2.1 降低单个任务的并行度(Parallelism)

Flink任务的并行度默认等于Slot数量,若部分任务无需高并行(如简单查询),可手动降低并行度,减少Slot占用:

-- 执行任务时指定并行度(如设为1,仅占用1个Slot)
Flink SQL> set execution.parallelism = 1;
-- 再执行任务(如查询)
Flink SQL> select * from order_info_cdc;

说明:默认情况下,每个任务的并行度 = parallelism.defaultflink-conf.yaml配置,默认1),若之前设为更高值(如2),会导致单个任务占用多个Slot。

2.2 合并小任务(减少任务总数)

若20个任务中有多个相似的轻量任务(如多个简单查询),可合并为1个任务,减少Slot需求:

  • 例:将多个select * from table where ...合并为union all查询:
    select * from table1 where type=1
    union all
    select * from table1 where type=2;
    
    合并后1个任务仅占用1个Slot,替代原来的2个任务。
方案3:配置Slot共享组(Share Group)

Flink支持“Slot共享组”,允许不同任务共享同一个Slot(仅适用于非密集型任务),提升Slot利用率。

  1. 在创建表或执行任务时,指定共享组名称:
    -- 执行任务时指定共享组(如所有查询共享“query_group”)
    Flink SQL> set execution.share-group = query_group;
    -- 执行多个任务,会共享同一个Slot(需确保任务非CPU密集型)
    Flink SQL> select * from table1;
    Flink SQL> select * from table2;
    
  2. 注意:共享组仅适合轻量任务(如简单查询),若任务是CPU/内存密集型(如大表聚合),共享会导致性能下降。
方案4:延长Slot请求超时时间(临时规避)

若任务可接受等待(如非实时任务),可延长slot.request.timeout,给Pending任务更多时间等待释放的Slot:

  1. 修改flink-conf.yaml
    # Slot请求超时时间(单位:毫秒,设为10分钟=600000ms)
    slot.request.timeout: 600000
    
  2. 重启Flink集群生效。

注意:这只是临时方案,若Slot总数始终不足,任务仍会最终失败,需配合方案1/2彻底解决。

三、验证与监控

  1. 查看任务状态:
    • 访问Flink Web UI,在“Jobs”页面查看Pending任务是否已分配Slot(状态从“Pending”变为“Running”)。
  2. 监控Slot使用:
    • 在Web UI“Cluster Overview”中查看“Used Slots”和“Available Slots”,确保Available Slots ≥ 0(无任务Pending)。

四、生产环境建议

  • 优先方案1:生产环境中,核心任务需保证资源充足,建议Slot总数预留20%-30%冗余(如20个任务,配置28个Slot),避免突发任务导致Pending。
  • 规范并行度配置:根据任务复杂度设置并行度(如大表处理设为4-8,简单查询设为1-2),避免盲目使用高并行度浪费资源。
  • 使用YARN/K8s动态资源:若Flink部署在YARN/K8s上,可开启动态资源分配(yarn.containers.dynamic/K8s的Pod动态扩缩容),自动根据任务需求增减Slot,无需手动配置。

通过以上方案,可彻底解决Slot不足导致的任务Pending问题,确保所有任务正常运行。

http://www.xdnf.cn/news/18696.html

相关文章:

  • VirtualBox 中安装 Ubuntu 22.04
  • 基于Java、GeoTools与PostGIS的对跖点求解研究
  • 如何快速对接印度股票市场数据API?完整开发指南
  • Solidity学习笔记
  • MATLAB实现CNN-GRU-Attention时序和空间特征结合-融合注意力机制混合神经网络模型的风速预测
  • AI Agent全栈开发流程推荐(全栈开发步骤)
  • Kubernetes v1.34 前瞻:资源管理、安全与可观测性的全面进化
  • 【和春笋一起学C++】(三十五)类的使用实例
  • 1.Spring Boot:超越配置地狱,重塑Java开发体验
  • 逆光场景识别率↑76%!陌讯多模态融合算法在手机拍照识别的落地实践​
  • centos安装jenkins
  • 校园跑腿小程序源码 | 跑腿便利店小程序 含搭建教程
  • bun + vite7 的结合,孕育的 Robot Admin 【靓仔出道】(十八)
  • 目标检测数据集 第005期-基于yolo标注格式的PCB组件检测数据集(含免费分享)
  • JavaScript数据结构详解
  • 智元精灵GO1 agibot数据转换Lerobot通用格式数据脚本
  • [创业之路-567]:数字技术、数字产品、数字资产、数字货币、数字企业、数字经济、数字世界、数字人生、数字智能、数字生命
  • 大模型知识--Function Calls
  • element-plus穿梭框transfer的调整
  • 【实习总结】快速上手Git:关键命令整理
  • AI版权保护破局内容行业痛点:侵权识别效率升89%+维权周期缩至45天,区块链存证成关键
  • vue中 computed vs methods
  • unity热更新总结
  • Linux的线程概念与控制
  • CTFshow系列——命令执行web49-52
  • 基于深度学习的眼疾识别系统:从血细胞分类到病理性近视检测
  • 计算机网络:聊天室(UDP)
  • 用户和组笔记
  • 大数据毕业设计选题推荐-基于大数据的北京市医保药品数据分析系统-Spark-Hadoop-Bigdata
  • 基于角色的访问控制(RBAC)研究与Go语言实现