[ 
https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007
 ] 

Xin Chen edited comment on FLINK-33483 at 11/8/23 12:45 PM:
------------------------------------------------------------

But in another scenario in production practice, UNDEFINED also appears. The Jm 
log can be found in the file  
[^container_e15_1693914709123_8498_01_000001_8042] , but I have not fully 
reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754          org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759          Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771          Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, also determined after reading from 
the RunningJobRegistry in zk?  I have also tried many times and did not 
reproduce this scene. The reproduction log is attached. I think the reason for 
the difficulty in reproducing is : When I disconnect all zks, jm will quickly 
down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters to rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and this 
has not been reproduced.


was (Author: JIRAUSER298666):
But in another scenario in production practice, UN also appears. The Jm log can 
be found in the file  [^container_e15_1693914709123_8498_01_000001_8042] , but 
I have not fully reproduced this scene. Based on the key information in the log:

{code:java}
15:00:57.657 State change: SUSPENDED
Connection to ZooKeeper suspended, waiting for reconnection.

15:00:54.754          org.apache.flink.util.FlinkException: ResourceManager 
leader changed to new address null
15:00:54.759          Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RUNNING to RESTARTING.
15:00:54.771          Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) 
switched from state RESTARTING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
Unable to canonicalize address zookeeper:2181 because it's not resolvable.

15:00:55.694   closing socket connection and attempting reconnect
15:00:57.657   State change: RECONNECTED
15:00:57.739  Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.  
15:00:57.740   Connection to ZooKeeper was reconnected. Leader election can be 
restarted.  

15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by 
JobManager.
15:00:57.742 Shutting down cluster because job not finished
15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status 
UNKNOWN. Diagnostics null. 
15:00:57.767 Unregister application from the YARN Resource Manager with final 
status UNDEFINED.
{code}

>From the logs, it can be seen that there was a disconnection of zk for a few 
>seconds. During the disconnection period, rm(resourcemanager) was affected and 
>the Flink task was suspended, attempting to reconnect zk. The most important 
>thing is that after ZK reconnects, for some unknown reason, jm directly 
>determines that the task is in an UNKNOWN state: *Shutting 
>YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
>null*. During this process, jm did not restart, which is different from 
>FLINK-12302. 

So how exactly is “UNKNOWN” identified here, also determined after reading from 
the RunningJobRegistry in zk?  I have also tried many times and did not 
reproduce this scene. The reproduction log is attached. I think the reason for 
the difficulty in reproducing is : When I disconnect all zks, jm will quickly 
down and restart, and the log will shows an error.

{code:java}
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(null, 
LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to 
akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the 
fencing token is null.
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
 ~
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~
{code}
Here, I speculate that the disconnection of zk may have affected rm's 
leadership, leading to issues when jm unregisters to rm with ApplicationStatus: 
UNDEFINED. However, in actual production scenarios, jm did not fail and this 
has not been reproduced.

> Why is “UNDEFINED” defined in the Flink task status?
> ----------------------------------------------------
>
>                 Key: FLINK-33483
>                 URL: https://issues.apache.org/jira/browse/FLINK-33483
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / RPC, Runtime / Task
>    Affects Versions: 1.12.2
>            Reporter: Xin Chen
>            Priority: Major
>         Attachments: container_e15_1693914709123_8498_01_000001_8042
>
>
> In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
> jm(jobmanager) will report the task status as undefined. The Yarn page will 
> display the state as FINISHED, but the final status is *UNDEFINED*. In terms 
> of business, it is unknown whether the task has failed or succeeded, and 
> whether to retry. It has a certain impact. Why should we design UNDEFINED? 
> Usually, this situation occurs due to zk(zookeeper) disconnection or jm 
> abnormality, etc. Since the abnormality is present, why not use FAILED?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to