Can you share the logs with us (ideally on DEBUG if available) from the
affected TaskManager and JobManager?
On 19/08/2021 08:29, Ivan Yang wrote:
Dear Flink community,
I recently running into this issue at a job startup. It happened from time to
time. Here is the exception from the job manager:
2021-08-17 01:21:01,944 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Defence raw
event prod05_analytics_output -> String to JSON -> (Sink: Malformed JSON Sink, ThreatSight Filter -> Raw Json
to ThreatSight Json -> ThreatSight Json to String -> Sink: ThreatSight Sink, Json to CDCA -> (mssp eventlog
filter 1 -> mssp eventlog filter 2 -> CDCA to eventlog json -> Flat Map, Sink: Parquet Sink Event Time), mssp
json filter -> action filter -> raw json to action json, Filter -> Json to ResponseAlarm, Filter -> json
to SecurityEvent) (542/626) (a7be17221c0726a67679091062cfa8dc) switched from DEPLOYING to FAILED on
172.1.200.173:6122-856ad2 @ ip-172-1-200-173.ec2.internal (dataPort=6121).
org.apache.flink.util.FlinkException: Could not mark slot
58af05c3109a0fe8f96ea8936c0783a4 active.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$handleAcceptedSlotOffers$18(TaskExecutor.java:1561)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_302]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_302]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_302]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.13.1.jar:1.13.1]
2021-08-17 01:21:01,945 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:01,999 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 3150 tasks should be restarted to recover the failed task
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:02,012 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Event
Router prod05 (0f10bafc07e48918f955fb22cbaa5735) switched from state RUNNING to
RESTARTING.
I am using kubernetes deployment session mode. We have 4 jobs running in the
cluster, 3 small jobs never had the problem. The issue only happened to the one
large job, which is 10 time more parallelisms then other smaller jobs. Can
someone help me on what the root cause of the issue and how to avoid it.
Thanks,
Ivan