[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ]
Xin Chen edited comment on FLINK-33483 at 11/8/23 12:45 PM: ------------------------------------------------------------ But in another scenario in production practice, UNDEFINED also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_000001_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached. I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters to rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and this has not been reproduced. was (Author: JIRAUSER298666): But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_000001_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached. I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters to rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and this has not been reproduced. > Why is “UNDEFINED” defined in the Flink task status? > ---------------------------------------------------- > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task > Affects Versions: 1.12.2 > Reporter: Xin Chen > Priority: Major > Attachments: container_e15_1693914709123_8498_01_000001_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)