[ https://issues.apache.org/jira/browse/FLINK-19985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lu Niu updated FLINK-19985: --------------------------- Description: Recently we had an issue that one flink job went into zombie state after ZK session timeout. During the incident, JM UI cannot load, no checkpoint metrics, but uptime metrics shows job is still running. Checking the log, what happened seemingly was: # ZK session timeout (JobManager HA is enabled) and reconnect, the JobManager is no longer the leader now: # {code:java} org.apache.flink.util.FlinkException: JobManager is no longer the leader.org.apache.flink.util.FlinkException: JobManager is no longer the leader. at org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeJobMasterLeadership(JobManagerRunner.java:391) at org.apache.flink.runtime.jobmaster.JobManagerRunner.lambda$revokeLeadership$5(JobManagerRunner.java:377) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) at org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeLeadership(JobManagerRunner.java:374) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.notLeader(ZooKeeperLeaderElectionService.java:247) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:640) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:636) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) at org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:635) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.handleStateChange(LeaderLatch.java:623) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.access$000(LeaderLatch.java:64) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$1.stateChanged(LeaderLatch.java:82) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:259) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:255) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) at org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.processEvents(ConnectionStateManager.java:253) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.access$000(ConnectionStateManager.java:43) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$1.call(ConnectionStateManager.java:111) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} 2. For some reason, the JobManager failed to register itself as the leader, leading the whole job into SUSPENDED and then CANCELED mode. exception was: # {code:java} 2020-11-04 18:42:43,523 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Unhandled exception. org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message LocalFencedMessage(826fbfb893e4dbce095f5f1b1d75426b, LocalRpcInvocation(requestJob(JobID, Time))) because the fencing token 826fbfb893e4dbce095f5f1b1d75426b did not match the expected fencing token b42ecc60ee45cf65209ab2c2da88473a. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:81) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} related logs: [https://gist.github.com/qqibrow/27e5c3bce0b2830809bbfd0da8a0f91c] was: Recently we had an issue that one flink job went into zombie state after ZK session timeout. What happened seemingly was: # ZK session timeout (JobManager HA is enabled) and reconnect, the JobManager is no longer the leader now: # {code:java} org.apache.flink.util.FlinkException: JobManager is no longer the leader.org.apache.flink.util.FlinkException: JobManager is no longer the leader. at org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeJobMasterLeadership(JobManagerRunner.java:391) at org.apache.flink.runtime.jobmaster.JobManagerRunner.lambda$revokeLeadership$5(JobManagerRunner.java:377) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) at org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeLeadership(JobManagerRunner.java:374) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.notLeader(ZooKeeperLeaderElectionService.java:247) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:640) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:636) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) at org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:635) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.handleStateChange(LeaderLatch.java:623) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.access$000(LeaderLatch.java:64) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$1.stateChanged(LeaderLatch.java:82) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:259) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:255) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) at org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.processEvents(ConnectionStateManager.java:253) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.access$000(ConnectionStateManager.java:43) at org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$1.call(ConnectionStateManager.java:111) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} 2. For some reason, the JobManager failed to register itself as the leader, leading the whole job into SUSPENDED and then CANCELED mode. exception was: # {code:java} 2020-11-04 18:42:43,523 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Unhandled exception. org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message LocalFencedMessage(826fbfb893e4dbce095f5f1b1d75426b, LocalRpcInvocation(requestJob(JobID, Time))) because the fencing token 826fbfb893e4dbce095f5f1b1d75426b did not match the expected fencing token b42ecc60ee45cf65209ab2c2da88473a. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:81) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} related logs: [https://gist.github.com/qqibrow/27e5c3bce0b2830809bbfd0da8a0f91c] > job went into zombie state after ZK session timeout > --------------------------------------------------- > > Key: FLINK-19985 > URL: https://issues.apache.org/jira/browse/FLINK-19985 > Project: Flink > Issue Type: Bug > Affects Versions: 1.9.1 > Reporter: Lu Niu > Priority: Major > > Recently we had an issue that one flink job went into zombie state after ZK > session timeout. During the incident, JM UI cannot load, no checkpoint > metrics, but uptime metrics shows job is still running. Checking the log, > what happened seemingly was: > # ZK session timeout (JobManager HA is enabled) and reconnect, the > JobManager is no longer the leader now: > # > {code:java} > org.apache.flink.util.FlinkException: JobManager is no longer the > leader.org.apache.flink.util.FlinkException: JobManager is no longer the > leader. at > org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeJobMasterLeadership(JobManagerRunner.java:391) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.lambda$revokeLeadership$5(JobManagerRunner.java:377) > at > java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) > at > java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeLeadership(JobManagerRunner.java:374) > at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.notLeader(ZooKeeperLeaderElectionService.java:247) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:640) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:636) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) > at > org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:635) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.handleStateChange(LeaderLatch.java:623) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch.access$000(LeaderLatch.java:64) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$1.stateChanged(LeaderLatch.java:82) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:259) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$2.apply(ConnectionStateManager.java:255) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) > at > org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.processEvents(ConnectionStateManager.java:253) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager.access$000(ConnectionStateManager.java:43) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager$1.call(ConnectionStateManager.java:111) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > 2. For some reason, the JobManager failed to register itself as the leader, > leading the whole job into SUSPENDED and then CANCELED mode. exception was: > # > {code:java} > 2020-11-04 18:42:43,523 ERROR > org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Unhandled > exception. > org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token > mismatch: Ignoring message > LocalFencedMessage(826fbfb893e4dbce095f5f1b1d75426b, > LocalRpcInvocation(requestJob(JobID, Time))) because the fencing token > 826fbfb893e4dbce095f5f1b1d75426b did not match the expected fencing token > b42ecc60ee45cf65209ab2c2da88473a. > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:81) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > related logs: > [https://gist.github.com/qqibrow/27e5c3bce0b2830809bbfd0da8a0f91c] > -- This message was sent by Atlassian Jira (v8.3.4#803005)