Flink Slot 不足导致任务Pending修复方案
当前有3个虚拟机节点,每个节点配置的slot节点数量是4,${FLINK_HOME}/conf/flink-conf.yaml 关于slot的配置如下:
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4state.backend: rocksdb
# checkpoint的时间根据集群性能和数据量确定
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://cdh01:8020/chkpoints
state.backend.incremental: true
其中,${HADOOP_HOME}/etc/hadoop/yarn-site.xml中关于yarn单个容器允许分配的最大内存设置如下:
<!-- yarn单个容器允许分配的最大最小内存 --><property><name>yarn.scheduler.minimum-allocation-mb</name><value>512</value></property><property><name>yarn.scheduler.maximum-allocation-mb</name><value>4096</value></property><!-- yarn容器允许管理的物理内存大小 --><property><name>yarn.nodemanager.resource.memory-mb</name><value>4096</value></property>
启动yarn-session以Flink session cluster供FLINK SQL以共享方式运行:
$FLINK_HOME/bin/yarn-session.sh -d
启动FLINK SQL Client:
$FLINK_HOME/bin/sql-client.sh embedded -i $FLINK_HOME/conf/sql_client_init.sql -s yarn-session
如图,提交23个 FlinkSQL任务(flink-cdc -> hudi-ods),由于slot数量不够,将导致12个任务之外的剩余任务处于pending的状态(黑紫任务)。
查看某个pending任务的log,看到报错如下:
2025-xx-xx 21:45:56
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:517)at akka.actor.Actor.aroundReceive$(Actor.scala:515)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)... 26 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms... 27 more
从报错来看,核心问题是 Flink集群总Slot(12个)不足以支撑23个任务的并发需求,导致11个任务因无法分配Slot而处于Pending状态,最终触发超时(300秒后仍未获取到资源)。解决思路围绕“增加可用Slot”和“优化资源分配”展开,具体方案如下:
一、核心原因拆解
Flink的Slot是任务执行的最小资源单元(每个Slot对应一个TaskManager的一个CPU核心,默认1个Slot/核心),当前12个Slot最多同时运行12个任务。23个任务并发时,超出的11个任务会进入Pending队列,若超过slot.request.timeout
(默认300秒)仍未获取到Slot,就会报超时错误。
二、解决方案(按优先级排序)
方案1:增加TaskManager节点或每个节点的Slot数量(最直接)
通过扩展Flink集群的Slot总数,满足23个任务的并发需求(需至少23个Slot)。
1.1 增加每个TaskManager的Slot数量(无需新增虚拟机)
若现有3台虚拟机的CPU资源充足(如每台虚拟机有10核CPU,当前仅用4核),可通过修改配置增加单节点的Slot数:
- 停止Flink集群:
./bin/stop-cluster.sh
- 修改
flink-conf.yaml
配置文件:# 每个TaskManager的Slot数量(默认1,根据CPU核数调整,建议不超过物理核数) taskmanager.numberOfTaskSlots: 10 # 3台虚拟机 × 10个Slot = 30个Slot,满足23个任务
注意:
numberOfTaskSlots
不能超过虚拟机的物理CPU核心数(如虚拟机是10核,最大设为10,留1核给系统)。 - 重启Flink集群:
./bin/start-cluster.sh
- 验证Slot总数:
- 访问Flink Web UI(如
http://hadoop103:8081
),在左侧“Cluster Overview”中查看“Total Slots”是否为30(3×10)。
- 访问Flink Web UI(如
1.2 新增TaskManager节点(现有节点资源不足时)
若3台虚拟机的CPU已达上限,需新增1台虚拟机作为TaskManager节点:
- 在新虚拟机上安装Flink(与现有集群版本一致),并配置
flink-conf.yaml
:# 指向现有JobManager的地址(如hadoop103) jobmanager.rpc.address: hadoop103 # 新节点的Slot数量(如8个,3台旧节点×4 + 1台新节点×8 = 20,仍不够可设为11) taskmanager.numberOfTaskSlots: 11
- 在新节点上启动TaskManager:
./bin/taskmanager.sh start
- 验证:Web UI中“Total Slots”是否达到23+。
方案2:优化任务资源分配(避免资源浪费)
若无法增加Slot总数,可通过优化任务的资源配置,减少单个任务的Slot占用,提升Slot利用率。
2.1 降低单个任务的并行度(Parallelism)
Flink任务的并行度默认等于Slot数量,若部分任务无需高并行(如简单查询),可手动降低并行度,减少Slot占用:
-- 执行任务时指定并行度(如设为1,仅占用1个Slot)
Flink SQL> set execution.parallelism = 1;
-- 再执行任务(如查询)
Flink SQL> select * from order_info_cdc;
说明:默认情况下,每个任务的并行度 =
parallelism.default
(flink-conf.yaml
配置,默认1),若之前设为更高值(如2),会导致单个任务占用多个Slot。
2.2 合并小任务(减少任务总数)
若20个任务中有多个相似的轻量任务(如多个简单查询),可合并为1个任务,减少Slot需求:
- 例:将多个
select * from table where ...
合并为union all
查询:
合并后1个任务仅占用1个Slot,替代原来的2个任务。select * from table1 where type=1 union all select * from table1 where type=2;
方案3:配置Slot共享组(Share Group)
Flink支持“Slot共享组”,允许不同任务共享同一个Slot(仅适用于非密集型任务),提升Slot利用率。
- 在创建表或执行任务时,指定共享组名称:
-- 执行任务时指定共享组(如所有查询共享“query_group”) Flink SQL> set execution.share-group = query_group; -- 执行多个任务,会共享同一个Slot(需确保任务非CPU密集型) Flink SQL> select * from table1; Flink SQL> select * from table2;
- 注意:共享组仅适合轻量任务(如简单查询),若任务是CPU/内存密集型(如大表聚合),共享会导致性能下降。
方案4:延长Slot请求超时时间(临时规避)
若任务可接受等待(如非实时任务),可延长slot.request.timeout
,给Pending任务更多时间等待释放的Slot:
- 修改
flink-conf.yaml
:# Slot请求超时时间(单位:毫秒,设为10分钟=600000ms) slot.request.timeout: 600000
- 重启Flink集群生效。
注意:这只是临时方案,若Slot总数始终不足,任务仍会最终失败,需配合方案1/2彻底解决。
三、验证与监控
- 查看任务状态:
- 访问Flink Web UI,在“Jobs”页面查看Pending任务是否已分配Slot(状态从“Pending”变为“Running”)。
- 监控Slot使用:
- 在Web UI“Cluster Overview”中查看“Used Slots”和“Available Slots”,确保Available Slots ≥ 0(无任务Pending)。
四、生产环境建议
- 优先方案1:生产环境中,核心任务需保证资源充足,建议Slot总数预留20%-30%冗余(如20个任务,配置28个Slot),避免突发任务导致Pending。
- 规范并行度配置:根据任务复杂度设置并行度(如大表处理设为4-8,简单查询设为1-2),避免盲目使用高并行度浪费资源。
- 使用YARN/K8s动态资源:若Flink部署在YARN/K8s上,可开启动态资源分配(
yarn.containers.dynamic
/K8s的Pod动态扩缩容),自动根据任务需求增减Slot,无需手动配置。
通过以上方案,可彻底解决Slot不足导致的任务Pending问题,确保所有任务正常运行。