Job is "FAILED" state and hence Flink HA Removed the job graph from
zookeeper along with the state.
One thing is we can't completely  rely on Flink HA for state restoring.
It will only until Job hasn't FAILED
If you want to recover Job even after Failure, you should do the following:
a) Use the Retained checkpoints, so that for the FAILED jobs flink HA won't
delete the checkpoint. Suppose if you configure
retained checkpoints to 3, then you will have 3 checkpoints available
anytime. So that you can restore the state back
b) How to restore the state back:
  1) Manually : For manual restoration, you can directly apply flink Cli
commands and restore the state using one of the retained checkpoint
   2) Automated: You need build your own service which keeps storing the
checkpoint file paths in Zookeeper/any other persistent store. So that
whenever state fails,
your service can detect it and automatically restore using the retained
checkpoints

Hope this helps

Regards
Bhaskar



On Sun, Jun 7, 2020 at 4:23 AM Teng Fei Liao <tengfl...@gmail.com> wrote:

> It seems like the JobManager is treating this as a job failure.
> A FAILED JobStatus is a globally terminal state so everything gets deleted
> with zookeeper HA.
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java#L39
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L263
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java#L108
> .
>
> On Sat, Jun 6, 2020 at 4:38 PM Kathula, Sandeep
> <sandeep_kath...@intuit.com.invalid> wrote:
>
> > Hi,
> >     We are running Flink 1.9 in K8S. We used
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html
> >   to set high availability. We have a single master. We set max number of
> > retries for a task to 2. After task fails twice and then the job manager
> > fails. This is expected. But it is removing checkpoint from the
> zookeeper.
> > As a result on the restart it is not consuming from the previous
> > checkpoint. We are losing the data.
> >
> > Logs:
> >
> > 2020/06/06 19:39:07.759 INFO  o.a.f.r.c.CheckpointCoordinator - Stopping
> > checkpoint coordinator for job 00000000000000000000000000000000.
> > 2020/06/06 19:39:07.759 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore
> > - Shutting down
> > 2020/06/06 19:39:07.823 INFO  o.a.f.r.z.ZooKeeperStateHandleStore -
> > Removing
> > /flink/sessionization_test4/checkpoints/00000000000000000000000000000000
> > from ZooKeeper
> > 2020/06/06 19:39:07.823 INFO  o.a.f.r.c.CompletedCheckpoint - Checkpoint
> > with ID 11 at
> >
> 's3://s3_bucket/sessionization_test/checkpoints/00000000000000000000000000000000/chk-11'
> > not discarded.
> > 2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter -
> > Shutting down.
> > 2020/06/06 19:39:07.829 INFO  o.a.f.r.c.ZooKeeperCheckpointIDCounter -
> > Removing /checkpoint-counter/00000000000000000000000000000000 from
> ZooKeeper
> > 2020/06/06 19:39:07.852 INFO  o.a.f.r.dispatcher.MiniDispatcher - Job
> > 00000000000000000000000000000000 reached globally terminal state FAILED.
> > 2020/06/06 19:39:07.852 INFO  o.a.f.runtime.jobmaster.JobMaster -
> Stopping
> > the JobMaster for job
> >
> sppstandardresourcemanager-flink-0606193838-6d7dae7e(00000000000000000000000000000000).
> > 2020/06/06 19:39:07.854 INFO  o.a.f.r.entrypoint.ClusterEntrypoint -
> > Shutting StandaloneJobClusterEntryPoint down with application status
> > FAILED. Diagnostics null.
> > 2020/06/06 19:39:07.854 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint -
> > Shutting down rest endpoint.
> > 2020/06/06 19:39:07.856 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService -
> > Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> > 2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl -
> Suspending
> > SlotPool.
> > 2020/06/06 19:39:07.859 INFO  o.a.f.runtime.jobmaster.JobMaster - Close
> > ResourceManager connection d28e9b9e1fc1ba78c2ed010070518057: JobManager
> is
> > shutting down..
> > 2020/06/06 19:39:07.859 INFO  o.a.f.r.j.slotpool.SlotPoolImpl - Stopping
> > SlotPool.
> > 2020/06/06 19:39:07.859 INFO  o.a.f.r.r.StandaloneResourceManager -
> > Disconnect job manager afae482ff82bdb26fe275174c14d4...@akka.tcp<mailto:
> > afae482ff82bdb26fe275174c14d4...@akka.tcp>://flink@flink-job-cluster
> :6123/user/jobmanager_0
> > for job 00000000000000000000000000000000 from the resource manager.
> > 2020/06/06 19:39:07.860 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService -
> > Stopping ZooKeeperLeaderElectionService
> >
> ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.
> > 2020/06/06 19:39:07.868 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint -
> > Removing cache directory
> > /tmp/flink-web-ef940924-348b-461c-ab53-255a914ed43a/flink-web-ui
> > 2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService -
> > Stopping ZooKeeperLeaderElectionService
> > ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
> > 2020/06/06 19:39:07.870 INFO  o.a.f.r.j.MiniDispatcherRestEndpoint - Shut
> > down complete.
> > 2020/06/06 19:39:07.870 INFO  o.a.f.r.r.StandaloneResourceManager - Shut
> > down cluster because application is in FAILED, diagnostics null.
> > 2020/06/06 19:39:07.870 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService -
> > Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
> > 2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService -
> > Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> > 2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher -
> Stopping
> > dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.
> > 2020/06/06 19:39:07.871 INFO  o.a.f.r.dispatcher.MiniDispatcher -
> Stopping
> > all currently running jobs of dispatcher
> akka.tcp://flink@flink-job-cluster
> > :6123/user/dispatcher.
> > 2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Closing the
> > SlotManager.
> > 2020/06/06 19:39:07.871 INFO  o.a.f.r.r.s.SlotManagerImpl - Suspending
> the
> > SlotManager.
> > 2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService -
> > Stopping ZooKeeperLeaderElectionService
> >
> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
> > 2020/06/06 19:39:07.871 INFO  o.a.f.r.l.ZooKeeperLeaderRetrievalService -
> > Stopping ZooKeeperLeaderRetrievalService
> > /leader/00000000000000000000000000000000/job_manager_lock.
> > 2020/06/06 19:39:07.974 INFO  o.a.f.r.r.h.l.b.StackTraceSampleCoordinator
> > - Shutting down stack trace sample coordinator.
> > 2020/06/06 19:39:07.975 INFO  o.a.f.r.l.ZooKeeperLeaderElectionService -
> > Stopping ZooKeeperLeaderElectionService
> > ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
> > 2020/06/06 19:39:07.975 INFO  o.a.f.r.dispatcher.MiniDispatcher - Stopped
> > dispatcher akka.tcp://flink@flink-job-cluster:6123/user/dispatcher.
> > 2020/06/06 19:39:07.975 INFO  o.a.flink.runtime.blob.BlobServer - Stopped
> > BLOB server at 0.0.0.0:6124
> > 2020/06/06 19:39:07.975 INFO  o.a.f.r.h.z.ZooKeeperHaServices - Close and
> > clean up all data for ZooKeeperHaServices.
> > 2020/06/06 19:39:08.085 INFO  o.a.f.s.c.o.a.c.f.i.CuratorFrameworkImpl -
> > backgroundOperationsLoop exiting
> > 2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn -
> > EventThread shut down for session: 0x17282452e8c0823
> > 2020/06/06 19:39:08.090 INFO  o.a.f.s.z.o.a.zookeeper.ZooKeeper -
> Session:
> > 0x17282452e8c0823 closed
> > 2020/06/06 19:39:08.091 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping
> > Akka RPC service.
> > 2020/06/06 19:39:08.093 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopping
> > Akka RPC service.
> > 2020/06/06 19:39:08.096 INFO
> > a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote
> daemon.
> > 2020/06/06 19:39:08.097 INFO
> > a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down;
> > proceeding with flushing remote transports.
> > 2020/06/06 19:39:08.099 INFO
> > a.r.RemoteActorRefProvider$RemotingTerminator - Shutting down remote
> daemon.
> > 2020/06/06 19:39:08.099 INFO
> > a.r.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down;
> > proceeding with flushing remote transports.
> > 2020/06/06 19:39:08.108 INFO
> > a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.
> > 2020/06/06 19:39:08.114 INFO
> > a.r.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.
> > 2020/06/06 19:39:08.123 INFO  o.a.f.r.rpc.akka.AkkaRpcService - Stopped
> > Akka RPC service.
> > 2020/06/06 19:39:08.124 INFO  o.a.f.r.entrypoint.ClusterEntrypoint -
> > Terminating cluster entrypoint process StandaloneJobClusterEntryPoint
> with
> > exit code 1443.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > Can you please help?
> >
> > Thanks
> > Sandeep Kathula
> >
> >
>

Reply via email to