Hi, we recently went live with a job on a shared cluster, which is managed with Yarn
The job was started using flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE Everything worked fine for a few days, but then the job needed to be restored for whatever reason 2023-08-03 16:34:44,525 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 2. org.apache.flink.util.FlinkException: Cannot deregister application. Resource manager service is not available. It seems that while restoring (yarn 'attempt' 02), Flink used the original checkpoint we provided as the value of the -s parameter, and not the most recent checkpoint for that job. This caused a few days worth of data to be reprocessed. 2023-08-03 16:34:55,259 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}. 2023-08-03 16:34:55,262 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 0 checkpoints in ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}. 2023-08-03 16:34:55,262 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 0 checkpoints from storage. 2023-08-03 16:34:55,262 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Initialized DefaultCompletedCheckpointStore in 'ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}' with /checkpoints. 2023-08-03 16:34:55,293 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for yyyyyyyy (JOBID-WWWWWWWWWW). 2023-08-03 16:34:55,293 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms. 2023-08-03 16:34:55,313 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 0 ms 2023-08-03 16:34:55,347 INFO org.apache.flink.yarn.YarnResourceManagerDriver [] - Recovered 0 containers from previous attempts ([]). 2023-08-03 16:34:55,347 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. 2023-08-03 16:34:55,369 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8, writeBatchSize=2097152} 2023-08-03 16:34:55,369 INFO org.apache.hadoop.conf.Configuration [] - resource-types.xml not found 2023-08-03 16:34:55,370 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to find 'resource-types.xml'. 2023-08-03 16:34:55,371 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT. 2023-08-03 16:34:55,371 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}. 2023-08-03 16:34:55,371 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8, writeBatchSize=2097152} 2023-08-03 16:34:55,371 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as EmbeddedRocksDBStateBackend 2023-08-03 16:34:55,375 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs:/apps/xxxx/flink/checkpoints/yyyyyy-job") 2023-08-03 16:34:55,377 INFO org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: [] 2023-08-03 16:34:55,390 INFO org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound of the thread pool size is 500 2023-08-03 16:34:55,407 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore. 2023-08-03 16:34:55,408 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting job JOBID-WWWWWWWWWW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE USED TO LAUNCH WITH] I see some binary-looking HA files in HDFS that seem to have references to the correct, latest checkpoint rather than the initial one. Does anyone have an idea as to what could be causing the recovery to use the initial checkpoint? Many thanks Fil