After checking the log I found the root cause is zk client timeout on TM: ``` 2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f 2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect 2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED 2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership. 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4. 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7). 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership. ```
I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks! Best Lu On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <tonysong...@gmail.com> wrote: > I'm not aware of any significant changes to the HA components between > 1.9/1.11. > Would you mind sharing the complete jobmanager/taskmanager logs? > > Thank you~ > > Xintong Song > > > > On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <qqib...@gmail.com> wrote: > >> Hi, Xintong >> >> Thanks for replying and your suggestion. I did check the ZK side but >> there is nothing interesting. The error message actually shows that only >> one TM thought JM lost leadership while others ran fine. Also, this >> happened only after we migrated from 1.9 to 1.11. Is it possible this is >> introduced by 1.11? >> >> Best >> Lu >> >> On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <tonysong...@gmail.com> >> wrote: >> >>> Hi Lu, >>> >>> I assume you are using ZooKeeper as the HA service? >>> >>> A common cause of unexpected leadership lost is the instability of HA >>> service. E.g., if ZK does not receive heartbeat from Flink RM for a >>> certain period of time, it will revoke the leadership and notify >>> other components. You can look into the ZooKeeper logs checking why RM's >>> leadership is revoked. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <qqib...@gmail.com> wrote: >>> >>>> Hi, Flink users >>>> >>>> Recently we migrated to flink 1.11 and see exceptions like: >>>> ``` >>>> 2020-12-15 12:41:01,199 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> >>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) >>>> (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on >>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386. >>>> java.lang.Exception: Job leader for job id >>>> 47b1531f79ffe3b86bc5910f6071e40c lost leadership. >>>> at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga] >>>> at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:539) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:581) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?] >>>> ``` >>>> >>>> ``` >>>> 2020-12-15 01:01:39,531 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >>>> USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> >>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) >>>> (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on >>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd. >>>> org.apache.flink.util.FlinkException: ResourceManager leader changed to >>>> new address null >>>> at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>> ~[nrtg-1.11_deploy.jar:?] >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> [nrtg-1.11_deploy.jar:?] >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:539) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:581) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) >>>> [nrtg-1.11_deploy.jar:?] >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?] >>>> ``` >>>> >>>> This happens a few times per week. It seems like one Task Manager >>>> wrongly thought JobMananger is lost and triggers a full restart of the >>>> whole job. Does anyone know how to resolve such errors? Thanks! >>>> >>>> Best >>>> Lu >>>> >>>