Hi, We have been running our job on flink image 1.13.2-stream1-scala_2.12-java11. It's a standalone deployment on a Kubernetes cluster (EKS on AWS which uses EC2 nodes as hosts and also depends on a auto-scaler to adjust the resources cluster wide). After a few mins. (5-20) we see the exception below on taskmanager(s). The job quite busy so we see backpressure on some tasks, though wasn't expecting such a problem under heavy load (we are ok with slow processing and backlog). Neither restarting the task or increasing the resources solved the issue. We always get the the exception below after a period of time which makes the job unstable.
---------------------------------------------------------------------------------------------------
java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not acquire the minimum required resources.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
Source)
at
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535)
....
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not acquire the minimum required resources.
---------------------------------------------------------------------------------------------------
We tried different configs. in terms of cpu/mem allocated for the task
managers in Flink configuration. We tried more cpu & mem. after
realized the problem though none of the increases actually solved the
problem. Part of the config we have is below.
taskmanager.numberOfTaskSlots: '4'
kubernetes:
pods:
affinity: null
annotations:
prometheus.io.port: '9249'
prometheus.io.scrape: 'true'
labels: {}
nodeSelector: {}
securityContext: null
logging:
log4jLoggers:
'': INFO
loggingProfile: default
numberOfTaskManagers: 2
parallelism: 8
resources:
jobmanager:
cpu: 2
memory: 2G
taskmanager:
cpu: 2
memory: 8G
Please find the attached the configuration file we use at the moment.
Thanks,
2021-11-04 09:28:53
java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not acquire the minimum required resources.
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
Source)
at
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535)
at
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:128)
at
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:362)
at
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:351)
at
org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:816)
at jdk.internal.reflect.GeneratedMethodAccessor102.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not acquire the minimum required resources.
flink_config.yml
Description: Binary data
