Hi Prasanna,
thanks for reaching out to the community. What you're experiencing is that
the savepoint was created but the job itself ended up in an inconsistent
state with Executions being cancelled instead of being finished. This
should have triggered a global failover resulting in a job restart. The
savepoint itself should be available, though.
It would be interesting to investigate in the logs how you ended up there.
Would you be able to share the entire JobManager logs and the TaskManager
logs?

Best,
Matthias


On Tue, Aug 31, 2021 at 10:26 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi ,
>
> We have a Publisher job which reads from Kafka Source(parallelism 4) and
> writes it to SNS through asyncIO operator(parallelism 10). Flink Version
> 1.12.2
>
> During deployment, I stopped this job using savepoint. Immediately , I saw
> that except 2 slots all of them got released. Following are the logs as
> soon as the job was stopped.  refer *<LOG1>*
>
> Even after half an hour those 2 slots were not released. So I manually
> cancelled the job. Then we received the following error. refer *<LOG2>
> Attached ScreenShot too*
>
> After 5-10 min of me cancelling, the leader job manager crashed.
>
> Also attached Screenshots of Metrics of JM JVM Class loaded, JM JVM Heap
> Used (Max available is 15gb) and JM JVM GC count(using young generation
> GC)
>
> Questions:
> 1) There was no state involved in the job. Does it take more than half an
> hour generally to generate a savepoint ?
> 2) Is it because of JobManger going to some kind of precrash state , the
> savepoint took this much time ?
> 3) What precaution should be taken before taking a savepoint and
> restarting a job ?
> 4) How do we find if a JobManager is in a good state ? All the metrics
> were green till it crashed .
> 5) I have also attached last 30 day JM JVM Class load metric , we are
> running just 2 jobs. throughout , but i see the count had increased to
> 80k+. If a job is killed , would not the loaded classed be cleaned up ? Or
> is it just a metric to show the number loaded so far historically but at
> the back end it's cleaned up ?
>
> *LOG 1*
>
> 2021-08-25 14:48:45.539 <AWSEC2IP>
> 2021-08-25 14:48:44,532 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] -
> Releasing idle slot [d1de73724ef75e9b5b45d29b0dd70f5e].
> 2021-08-25 14:48:45.539 <AWSEC2IP>
> 2021-08-25 14:48:44,532 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] -
> Releasing idle slot [e1987aed51d29ec929d87fd483bd9771].
> 2021-08-25 14:48:45.539 <AWSEC2IP>
> 2021-08-25 14:48:44,532 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] -
> Releasing idle slot [8afef84967f04061721b2402d6031f03].
> 2021-08-25 14:48:45.539 <AWSEC2IP>
> 2021-08-25 14:48:44,532 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] -
> Releasing idle slot [f22183076bdf7fcba2497b99f5300c15].
> 2021-08-25 14:48:45.539 <AWSEC2IP>
> 2021-08-25 14:48:44,531 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] -
> Releasing idle slot [f8b0d455cde18d84b08b8e2779311bb8].
> 2021-08-25 14:48:45.539 <AWSEC2IP>
> 2021-08-25 14:48:44,532 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] -
> Releasing idle slot [f34615c7c002d3a0cf6679643982635d].
> 2021-08-25 14:48:44.531 <AWSEC2IP>
> 2021-08-25 14:48:44,531 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl     [] -
> Releasing idle slot [233cbae4c99dd6bf939ca8f405836df4].
> 2021-08-25 14:47:34.537 <AWSEC2IP>
> 2021-08-25 14:47:33,520 INFO  org.apache.flink.runtime.taskmanager.Task
>                  [] - Freeing task resources for Map -> async wait operator
> -> SNS_SINK (6/10)#0 (5a5338ea27890cccfb73c0f7c23aa94e).
> 2021-08-25 14:47:34.537 <AWSEC2IP>
> 2021-08-25 14:47:33,520 INFO  org.apache.flink.runtime.taskmanager.Task
>                  [] - Map -> async wait operator -> SNS_SINK (6/10)#0
> (5a5338ea27890cccfb73c0f7c23aa94e) switched from RUNNING to FINISHED.
> 2021-08-25 14:47:34.537 <AWSEC2IP>
> 2021-08-25 14:47:33,504 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
> Un-registering task and sending final execution state FINISHED to
> JobManager for task Source: KAFKA-SOURCE (4/4)#0
> 49fbfeefbec6d6c1bf66df64d2b395f3.
> 2021-08-25 14:47:34.537 <AWSEC2IP>
> 2021-08-25 14:47:33,520 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
> Un-registering task and sending final execution state FINISHED to
> JobManager for task Map -> async wait operator -> SNS_SINK (6/10)#0
> 5a5338ea27890cccfb73c0f7c23aa94e.
> 2021-08-25 14:47:33.524 <AWSEC2IP>
> 2021-08-25 14:47:33,524 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (9/10) (d88fe1bf433b9db5d5b5fa6a628da558)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.524 <AWSEC2IP>
> 2021-08-25 14:47:33,524 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (1/10) (3db9c21c8932c334588ebc0b5e2b877c)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.524 <AWSEC2IP>
> 2021-08-25 14:47:33,524 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (5/10) (d05a94884d092b07aaac719caa88585a)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.523 <AWSEC2IP>
> 2021-08-25 14:47:33,523 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KAFKA-SOURCE (1/4) (c9535044384005f79d01de53a3750474) switched from RUNNING
> to FINISHED.
> 2021-08-25 14:47:33.521 <AWSEC2IP>
> 2021-08-25 14:47:33,521 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (6/10) (5a5338ea27890cccfb73c0f7c23aa94e)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.515 <AWSEC2IP>
> 2021-08-25 14:47:33,514 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (3/10) (cf3fecee5443da7a2a18e38f9d845deb)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.515 <AWSEC2IP>
> 2021-08-25 14:47:33,515 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (8/10) (64f302db73ce9887888fd65f806b8c6a)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.513 <AWSEC2IP>
> 2021-08-25 14:47:33,513 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (2/10) (9bd2e86e61d58f3b83c14b68f034a536)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.513 <AWSEC2IP>
> 2021-08-25 14:47:33,513 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map ->
> async wait operator -> SNS_SINK (7/10) (f7ef40d3a00285deb7a9400efd4a4b1c)
> switched from RUNNING to FINISHED.
> 2021-08-25 14:47:33.509 <AWSEC2IP>
> 2021-08-25 14:47:33,508 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KAFKA-SOURCE (3/4) (0b0487af5976c367f1f17254362ae50e) switched from RUNNING
> to FINISHED.
> 2021-08-25 14:47:33.507 <AWSEC2IP>
> 2021-08-25 14:47:33,507 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KAFKA-SOURCE (2/4) (475f286d0029cba335cc66768e711f28) switched from RUNNING
> to FINISHED.
> 2021-08-25 14:47:33.505 <AWSEC2IP>
> 2021-08-25 14:47:33,505 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KAFKA-SOURCE (4/4) (49fbfeefbec6d6c1bf66df64d2b395f3) switched from RUNNING
> to FINISHED.
> 2021-08-25 14:47:33.499 <AWSEC2IP>
> 2021-08-25 14:47:33,499 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Completed checkpoint 520957 for job a0a219e9b8647f14a2ba1766cc6a8ffc (57199
> bytes in 359 ms).
> 2021-08-25 14:47:33.056 <AWSEC2IP>
> 2021-08-25 14:47:33,056 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 520957 (type=SAVEPOINT_TERMINATE) @ 1629902852888 for
> job a0a219e9b8647f14a2ba1766cc6a8ffc.
> 2021-08-25 14:47:32.888 <AWSEC2IP>
> 2021-08-25 14:47:32,888 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Completed checkpoint 520956 for job a0a219e9b8647f14a2ba1766cc6a8ffc (57181
> bytes in 317 ms).
> 2021-08-25 14:47:32.863 <AWSEC2IP>
> 2021-08-25 14:47:32,863 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>                 [] - Triggering stop-with-savepoint for job
> a0a219e9b8647f14a2ba1766cc6a8ffc.
>
>
> *LOG2*
>
> {"status":{"id":"COMPLETED"},"operation":{"failure-cause":{"class":"java.util.concurrent.CompletionException","stack-trace":"java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: Inconsistent execution state after
> stopping with savepoint. At least one execution is still in one of the
> following states: CANCELED. A global fail-over is triggered to recover the
> job a0a219e9b8647f14a2ba1766cc6a8ffc.\n\tat
> java.base/java.util.concurrent.CompletableFuture.encodeRelay(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture.completeRelay(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)\n\tat
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source)\n\tat
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044)\n\tat
> akka.dispatch.OnComplete.internal(Future.scala:263)\n\tat
> akka.dispatch.OnComplete.internal(Future.scala:261)\n\tat
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)\n\tat
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)\n\tat
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)\n\tat
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)\n\tat
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)\n\tat
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)\n\tat
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)\n\tat
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)\n\tat
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)\n\tat
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)\n\tat
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)\n\tat
> scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)\n\tat
> scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)\n\tat
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)\n\tat
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)\n\tat
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)\n\tat
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)\n\tat
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)\n\tat
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)\n\tat
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)\n\tat
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)\n\tat
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)\n\tat
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused
> by: org.apache.flink.util.FlinkException: Inconsistent execution state
> after stopping with savepoint. At least one execution is still in one of
> the following states: CANCELED. A global fail-over is triggered to recover
> the job a0a219e9b8647f14a2ba1766cc6a8ffc.\n\tat
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.terminateExceptionallyWithGlobalFailover(StopWithSavepointTerminationHandlerImpl.java:174)\n\tat
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.access$500(StopWithSavepointTerminationHandlerImpl.java:54)\n\tat
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl$SavepointCreated.onAnyExecutionNotFinished(StopWithSavepointTerminationHandlerImpl.java:239)\n\tat
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.handleAnyExecutionNotFinished(StopWithSavepointTerminationHandlerImpl.java:151)\n\tat
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.handleExecutionsTermination(StopWithSavepointTerminationHandlerImpl.java:112)\n\tat
> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown
> Source)\n\tat
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)\n\tat
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat
> akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat
> akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\t... 4
> more\n","serialized-throwable":"RANDOMENCRYPTEDSTRING"}}}
>
> *Thanks,*
> *Prasanna.*
>

Reply via email to