Hi, Fil If we don't specify the ExternalizedCheckpointCleanup, the default checkpoint retention strategy is never retaining after the job terminates, which causes your issue. So I think your configure the ExternalizedCheckpointCleanup to `RETAIN_ON_CANCELLING`.
Best, Ron Filip Karnicki <filip.karni...@gmail.com> 于2023年8月4日周五 16:24写道: > update: Our current line of thinking is that need this to be set in order > to the checkpoint to live across job manager failures > > - > > ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the > checkpoint when the job is cancelled. The checkpoint state will only be > available if the job fails. > > > On Fri, 4 Aug 2023 at 05:03, Filip Karnicki <filip.karni...@gmail.com> > wrote: > >> Hi, we recently went live with a job on a shared cluster, which is >> managed with Yarn >> >> The job was started using >> >> flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE >> >> >> Everything worked fine for a few days, but then the job needed to be >> restored for whatever reason >> >> 2023-08-03 16:34:44,525 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - >> Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit >> code 2. >> >> org.apache.flink.util.FlinkException: Cannot deregister application. >> Resource manager service is not available. >> >> >> It seems that while restoring (yarn 'attempt' 02), Flink used the >> original checkpoint we provided as the value of the -s parameter, and not >> the most recent checkpoint for that job. This caused a few days worth of >> data to be reprocessed. >> >> >> 2023-08-03 16:34:55,259 INFO >> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] >> - Recovering checkpoints from >> ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}. >> >> 2023-08-03 16:34:55,262 INFO >> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] >> - Found 0 checkpoints in >> ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}. >> >> 2023-08-03 16:34:55,262 INFO >> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] >> - Trying to fetch 0 checkpoints from storage. >> >> 2023-08-03 16:34:55,262 INFO >> org.apache.flink.runtime.util.ZooKeeperUtils [] - >> Initialized DefaultCompletedCheckpointStore in >> 'ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}' >> with /checkpoints. >> >> 2023-08-03 16:34:55,293 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Running >> initialization on master for yyyyyyyy (JOBID-WWWWWWWWWW). >> >> 2023-08-03 16:34:55,293 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - >> Successfully ran initialization on master in 0 ms. >> >> 2023-08-03 16:34:55,313 INFO >> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - >> Built 1 pipelined regions in 0 ms >> >> 2023-08-03 16:34:55,347 INFO >> org.apache.flink.yarn.YarnResourceManagerDriver [] - Recovered >> 0 containers from previous attempts ([]). >> >> 2023-08-03 16:34:55,347 INFO >> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - >> Recovered 0 workers from previous attempt. >> >> 2023-08-03 16:34:55,369 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using >> job/cluster config to configure application-defined state backend: >> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, >> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8, >> writeBatchSize=2097152} >> >> 2023-08-03 16:34:55,369 INFO >> org.apache.hadoop.conf.Configuration [] - >> resource-types.xml not found >> >> 2023-08-03 16:34:55,370 INFO >> org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to >> find 'resource-types.xml'. >> >> 2023-08-03 16:34:55,371 INFO >> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - >> Using predefined options: DEFAULT. >> >> 2023-08-03 16:34:55,371 INFO >> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - >> Using application-defined options factory: >> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}. >> >> 2023-08-03 16:34:55,371 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - Using >> application-defined state backend: EmbeddedRocksDBStateBackend{, >> localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, >> numberOfTransferThreads=8, writeBatchSize=2097152} >> >> 2023-08-03 16:34:55,371 INFO >> org.apache.flink.runtime.state.StateBackendLoader [] - State >> backend loader loads the state backend as EmbeddedRocksDBStateBackend >> >> 2023-08-03 16:34:55,375 INFO >> org.apache.flink.runtime.jobmaster.JobMaster [] - >> Checkpoint storage is set to 'filesystem': (checkpoints >> "hdfs:/apps/xxxx/flink/checkpoints/yyyyyy-job") >> >> 2023-08-03 16:34:55,377 INFO >> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - >> Enabled external resources: [] >> >> 2023-08-03 16:34:55,390 INFO >> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper >> bound of the thread pool size is 500 >> >> 2023-08-03 16:34:55,407 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No >> checkpoint found during restore. >> >> 2023-08-03 16:34:55,408 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting >> job JOBID-WWWWWWWWWW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE >> USED TO LAUNCH WITH] >> >> >> >> I see some binary-looking HA files in HDFS that seem to have references >> to the correct, latest checkpoint rather than the initial one. >> >> Does anyone have an idea as to what could be causing the recovery to use >> the initial checkpoint? >> >> Many thanks >> Fil >> >