Hi Kevin, I think you may not set the high availability configurations in your native K8s session. Currently, we only support zookeeper HA, so you need to add the following configuration. After the HA is configured, the running job, checkpoint and other meta could be stored. When the jobmanager failover, all the jobs could be recovered then. I have tested it could work properly.
high-availability: zookeeper high-availability.zookeeper.quorum: zk-client:2181 high-availability.storageDir: hdfs:///flink/recovery I know you may not have a zookeeper cluster.You could a zookeeper K8s operator[1] to deploy a new one. More over, it is not very convenient to use zookeeper as HA. So a K8s native HA support[2] is in plan and we are trying to finish it in the next major release cycle(1.12). [1]. https://github.com/pravega/zookeeper-operator [2]. https://issues.apache.org/jira/browse/FLINK-12884 Best, Yang Bohinski, Kevin <kevin_bohin...@comcast.com> 于2020年8月7日周五 下午11:40写道: > Hi all, > > > > In our 1.11.1 native k8s session after we submit a job it will run > successfully for a few hours then fail when the jobmanager pod restarts. > > > > Relevant logs after restart are attached below. Any suggestions? > > > > Best > > kevin > > > > 2020-08-06 21:50:24,425 INFO > org.apache.flink.kubernetes.KubernetesResourceManager [] - Recovered > 32 pods from previous attempts, current attempt id is 2. > > 2020-08-06 21:50:24,610 DEBUG > org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher [] - > Received ADDED event for pod REDACTED-flink-session-taskmanager-1-16, > details: PodStatus(conditions=[PodCondition(lastProbeTime=null, > lastTransitionTime=2020-08-06T18:48:33Z, message=null, reason=null, > status=True, type=Initialized, additionalProperties={}), > PodCondition(lastProbeTime=null, lastTransitionTime=2020-08-06T18:48:37Z, > message=null, reason=null, status=True, type=Ready, > additionalProperties={}), PodCondition(lastProbeTime=null, > lastTransitionTime=2020-08-06T18:48:37Z, message=null, reason=null, > status=True, type=ContainersReady, additionalProperties={}), > PodCondition(lastProbeTime=null, lastTransitionTime=2020-08-06T18:48:33Z, > message=null, reason=null, status=True, type=PodScheduled, > additionalProperties={})], > containerStatuses=[ContainerStatus(containerID=docker://REDACTED, > image=REDACTED/flink:1.11.1-scala_2.11-s3-0, > imageID=docker-pullable://REDACTED/flink@sha256:REDACTED, > lastState=ContainerState(running=null, terminated=null, waiting=null, > additionalProperties={}), name=flink-task-manager, ready=true, > restartCount=0, started=true, > state=ContainerState(running=ContainerStateRunning(startedAt=2020-08-06T18:48:35Z, > additionalProperties={}), terminated=null, waiting=null, > additionalProperties={}), additionalProperties={})], > ephemeralContainerStatuses=[], hostIP=REDACTED, initContainerStatuses=[], > message=null, nominatedNodeName=null, phase=Running, podIP=REDACTED, > podIPs=[PodIP(ip=REDACTED, additionalProperties={})], qosClass=Guaranteed, > reason=null, startTime=2020-08-06T18:48:33Z, additionalProperties={}) > > 2020-08-06 21:50:24,613 DEBUG > org.apache.flink.kubernetes.KubernetesResourceManager [] - Ignore > TaskManager pod that is already added: > REDACTED-flink-session-taskmanager-1-16 > > 2020-08-06 21:50:24,615 INFO > org.apache.flink.kubernetes.KubernetesResourceManager [] - Received > 0 new TaskManager pods. Remaining pending pod requests: 0 > > 2020-08-06 21:50:24,631 DEBUG > org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore [] - > Could not load archived execution graph for job id > 8c76f37962afd87783c65c95387fb828. > > java.util.concurrent.ExecutionException: java.io.FileNotFoundException: > Could not find file for archived execution graph > 8c76f37962afd87783c65c95387fb828. This indicates that the file either has > been deleted or never written. > > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3941) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4824) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.get(FileArchivedExecutionGraphStore.java:143) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJob$20(Dispatcher.java:554) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) > ~[?:1.8.0_262] > > at > java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:898) > ~[?:1.8.0_262] > > at > java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2209) > ~[?:1.8.0_262] > > at > org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:552) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:1.8.0_262] > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_262] > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_262] > > at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_262] > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.11.1.jar:1.11.1] > > Caused by: java.io.FileNotFoundException: Could not find file for archived > execution graph 8c76f37962afd87783c65c95387fb828. This indicates that the > file either has been deleted or never written. > > at > org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.loadExecutionGraph(FileArchivedExecutionGraphStore.java:239) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.access$000(FileArchivedExecutionGraphStore.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore$1.load(FileArchivedExecutionGraphStore.java:118) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore$1.load(FileArchivedExecutionGraphStore.java:115) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > ... 37 more > > 2020-08-06 21:50:55,824 DEBUG > org.apache.flink.kubernetes.KubernetesResourceManager [] - Ignoring > outdated TaskExecutorGateway connection for > REDACTED-flink-session-taskmanager-1-16. > > 2020-08-06 21:50:56,519 INFO > org.apache.flink.kubernetes.KubernetesResourceManager [] - > Registering TaskManager with ResourceID > REDACTED-flink-session-taskmanager-1-16 > (akka.tcp://flink@REDACTED:6122/user/rpc/taskmanager_0) > at ResourceManager > > 2020-08-06 21:50:56,613 DEBUG > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - > Registering TaskManager REDACTED-flink-session-taskmanager-1-16 under > 89c651cee3d3f4f92b1c9472a9ec4507 at the SlotManager. > > 2020-08-06 21:50:55,823 DEBUG > org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Try to > connect to remote RPC endpoint with address > akka.tcp://flink@REDACTED:6122/user/rpc/taskmanager_0. > Returning a org.apache.flink.runtime.taskexecutor.TaskExecutorGateway > gateway. >