update: Our current line of thinking is that need this to be set in order
to the checkpoint to live across job manager failures

   -

   ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the
   checkpoint when the job is cancelled. The checkpoint state will only be
   available if the job fails.


On Fri, 4 Aug 2023 at 05:03, Filip Karnicki <filip.karni...@gmail.com>
wrote:

> Hi, we recently went live with a job on a shared cluster, which is managed
> with Yarn
>
> The job was started using
>
> flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE
>
>
> Everything worked fine for a few days, but then the job needed to be
> restored for whatever reason
>
> 2023-08-03 16:34:44,525 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
> Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit
> code 2.
>
> org.apache.flink.util.FlinkException: Cannot deregister application.
> Resource manager service is not available.
>
>
> It seems that while restoring (yarn 'attempt' 02), Flink used the original
> checkpoint we provided as the value of the -s parameter, and not the most
> recent checkpoint for that job. This caused a few days worth of data to be
> reprocessed.
>
>
> 2023-08-03 16:34:55,259 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Recovering checkpoints from
> ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}.
>
> 2023-08-03 16:34:55,262 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Found 0 checkpoints in
> ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}.
>
> 2023-08-03 16:34:55,262 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Trying to fetch 0 checkpoints from storage.
>
> 2023-08-03 16:34:55,262 INFO
> org.apache.flink.runtime.util.ZooKeeperUtils                 [] -
> Initialized DefaultCompletedCheckpointStore in
> 'ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}'
> with /checkpoints.
>
> 2023-08-03 16:34:55,293 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running
> initialization on master for yyyyyyyy (JOBID-WWWWWWWWWW).
>
> 2023-08-03 16:34:55,293 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> Successfully ran initialization on master in 0 ms.
>
> 2023-08-03 16:34:55,313 INFO
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
> Built 1 pipelined regions in 0 ms
>
> 2023-08-03 16:34:55,347 INFO
> org.apache.flink.yarn.YarnResourceManagerDriver              [] - Recovered
> 0 containers from previous attempts ([]).
>
> 2023-08-03 16:34:55,347 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
>
> 2023-08-03 16:34:55,369 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
> job/cluster config to configure application-defined state backend:
> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8,
> writeBatchSize=2097152}
>
> 2023-08-03 16:34:55,369 INFO
> org.apache.hadoop.conf.Configuration                         [] -
> resource-types.xml not found
>
> 2023-08-03 16:34:55,370 INFO
> org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to
> find 'resource-types.xml'.
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using predefined options: DEFAULT.
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}.
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
> application-defined state backend: EmbeddedRocksDBStateBackend{,
> localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
> numberOfTransferThreads=8, writeBatchSize=2097152}
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.runtime.state.StateBackendLoader            [] - State
> backend loader loads the state backend as EmbeddedRocksDBStateBackend
>
> 2023-08-03 16:34:55,375 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> Checkpoint storage is set to 'filesystem': (checkpoints
> &quot;hdfs:/apps/xxxx/flink/checkpoints/yyyyyy-job&quot;)
>
> 2023-08-03 16:34:55,377 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
>
> 2023-08-03 16:34:55,390 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
> bound of the thread pool size is 500
>
> 2023-08-03 16:34:55,407 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
> checkpoint found during restore.
>
> 2023-08-03 16:34:55,408 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting
> job JOBID-WWWWWWWWWW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE
> USED TO LAUNCH WITH]
>
>
>
> I see some binary-looking HA files in HDFS that seem to have references to
> the correct, latest checkpoint rather than the initial one.
>
> Does anyone have an idea as to what could be causing the recovery to use
> the initial checkpoint?
>
> Many thanks
> Fil
>

Reply via email to