update: Our current line of thinking is that need this to be set in order to the checkpoint to live across job manager failures
- ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. On Fri, 4 Aug 2023 at 05:03, Filip Karnicki <filip.karni...@gmail.com> wrote: > Hi, we recently went live with a job on a shared cluster, which is managed > with Yarn > > The job was started using > > flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE > > > Everything worked fine for a few days, but then the job needed to be > restored for whatever reason > > 2023-08-03 16:34:44,525 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - > Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit > code 2. > > org.apache.flink.util.FlinkException: Cannot deregister application. > Resource manager service is not available. > > > It seems that while restoring (yarn 'attempt' 02), Flink used the original > checkpoint we provided as the value of the -s parameter, and not the most > recent checkpoint for that job. This caused a few days worth of data to be > reprocessed. > > > 2023-08-03 16:34:55,259 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] > - Recovering checkpoints from > ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}. > > 2023-08-03 16:34:55,262 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] > - Found 0 checkpoints in > ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}. > > 2023-08-03 16:34:55,262 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] > - Trying to fetch 0 checkpoints from storage. > > 2023-08-03 16:34:55,262 INFO > org.apache.flink.runtime.util.ZooKeeperUtils [] - > Initialized DefaultCompletedCheckpointStore in > 'ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}' > with /checkpoints. > > 2023-08-03 16:34:55,293 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Running > initialization on master for yyyyyyyy (JOBID-WWWWWWWWWW). > > 2023-08-03 16:34:55,293 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - > Successfully ran initialization on master in 0 ms. > > 2023-08-03 16:34:55,313 INFO > org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - > Built 1 pipelined regions in 0 ms > > 2023-08-03 16:34:55,347 INFO > org.apache.flink.yarn.YarnResourceManagerDriver [] - Recovered > 0 containers from previous attempts ([]). > > 2023-08-03 16:34:55,347 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Recovered 0 workers from previous attempt. > > 2023-08-03 16:34:55,369 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > job/cluster config to configure application-defined state backend: > EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, > enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8, > writeBatchSize=2097152} > > 2023-08-03 16:34:55,369 INFO > org.apache.hadoop.conf.Configuration [] - > resource-types.xml not found > > 2023-08-03 16:34:55,370 INFO > org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to > find 'resource-types.xml'. > > 2023-08-03 16:34:55,371 INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Using predefined options: DEFAULT. > > 2023-08-03 16:34:55,371 INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Using application-defined options factory: > DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}. > > 2023-08-03 16:34:55,371 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Using > application-defined state backend: EmbeddedRocksDBStateBackend{, > localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, > numberOfTransferThreads=8, writeBatchSize=2097152} > > 2023-08-03 16:34:55,371 INFO > org.apache.flink.runtime.state.StateBackendLoader [] - State > backend loader loads the state backend as EmbeddedRocksDBStateBackend > > 2023-08-03 16:34:55,375 INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - > Checkpoint storage is set to 'filesystem': (checkpoints > "hdfs:/apps/xxxx/flink/checkpoints/yyyyyy-job") > > 2023-08-03 16:34:55,377 INFO > org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - > Enabled external resources: [] > > 2023-08-03 16:34:55,390 INFO > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper > bound of the thread pool size is 500 > > 2023-08-03 16:34:55,407 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No > checkpoint found during restore. > > 2023-08-03 16:34:55,408 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting > job JOBID-WWWWWWWWWW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE > USED TO LAUNCH WITH] > > > > I see some binary-looking HA files in HDFS that seem to have references to > the correct, latest checkpoint rather than the initial one. > > Does anyone have an idea as to what could be causing the recovery to use > the initial checkpoint? > > Many thanks > Fil >