Hi, We have been running our job on flink image 1.13.2-stream1-scala_2.12-java11. It's a standalone deployment on a Kubernetes cluster (EKS on AWS which uses EC2 nodes as hosts and also depends on a auto-scaler to adjust the resources cluster wide). After a few mins. (5-20) we see the exception below on taskmanager(s). The job quite busy so we see backpressure on some tasks, though wasn't expecting such a problem under heavy load (we are ok with slow processing and backlog). Neither restarting the task or increasing the resources solved the issue. We always get the the exception below after a period of time which makes the job unstable.
--------------------------------------------------------------------------------------------------- java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535) .... Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. --------------------------------------------------------------------------------------------------- We tried different configs. in terms of cpu/mem allocated for the task managers in Flink configuration. We tried more cpu & mem. after realized the problem though none of the increases actually solved the problem. Part of the config we have is below. taskmanager.numberOfTaskSlots: '4' kubernetes: pods: affinity: null annotations: prometheus.io.port: '9249' prometheus.io.scrape: 'true' labels: {} nodeSelector: {} securityContext: null logging: log4jLoggers: '': INFO loggingProfile: default numberOfTaskManagers: 2 parallelism: 8 resources: jobmanager: cpu: 2 memory: 2G taskmanager: cpu: 2 memory: 8G Please find the attached the configuration file we use at the moment. Thanks,
2021-11-04 09:28:53 java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:128) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:362) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:351) at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:816) at jdk.internal.reflect.GeneratedMethodAccessor102.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
flink_config.yml
Description: Binary data