Hi, we recently went live with a job on a shared cluster, which is managed
with Yarn

The job was started using

flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE


Everything worked fine for a few days, but then the job needed to be
restored for whatever reason

2023-08-03 16:34:44,525 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit
code 2.

org.apache.flink.util.FlinkException: Cannot deregister application.
Resource manager service is not available.


It seems that while restoring (yarn 'attempt' 02), Flink used the original
checkpoint we provided as the value of the -s parameter, and not the most
recent checkpoint for that job. This caused a few days worth of data to be
reprocessed.


2023-08-03 16:34:55,259 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Recovering checkpoints from
ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Found 0 checkpoints in
ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Trying to fetch 0 checkpoints from storage.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.util.ZooKeeperUtils                 [] -
Initialized DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}'
with /checkpoints.

2023-08-03 16:34:55,293 INFO
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running
initialization on master for yyyyyyyy (JOBID-WWWWWWWWWW).

2023-08-03 16:34:55,293 INFO
org.apache.flink.runtime.jobmaster.JobMaster                 [] -
Successfully ran initialization on master in 0 ms.

2023-08-03 16:34:55,313 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
Built 1 pipelined regions in 0 ms

2023-08-03 16:34:55,347 INFO
org.apache.flink.yarn.YarnResourceManagerDriver              [] - Recovered
0 containers from previous attempts ([]).

2023-08-03 16:34:55,347 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Recovered 0 workers from previous attempt.

2023-08-03 16:34:55,369 INFO
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8,
writeBatchSize=2097152}

2023-08-03 16:34:55,369 INFO
org.apache.hadoop.conf.Configuration                         [] -
resource-types.xml not found

2023-08-03 16:34:55,370 INFO
org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to
find 'resource-types.xml'.

2023-08-03 16:34:55,371 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Using predefined options: DEFAULT.

2023-08-03 16:34:55,371 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Using application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}.

2023-08-03 16:34:55,371 INFO
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
application-defined state backend: EmbeddedRocksDBStateBackend{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=8, writeBatchSize=2097152}

2023-08-03 16:34:55,371 INFO
org.apache.flink.runtime.state.StateBackendLoader            [] - State
backend loader loads the state backend as EmbeddedRocksDBStateBackend

2023-08-03 16:34:55,375 INFO
org.apache.flink.runtime.jobmaster.JobMaster                 [] -
Checkpoint storage is set to 'filesystem': (checkpoints
"hdfs:/apps/xxxx/flink/checkpoints/yyyyyy-job")

2023-08-03 16:34:55,377 INFO
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
Enabled external resources: []

2023-08-03 16:34:55,390 INFO
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
bound of the thread pool size is 500

2023-08-03 16:34:55,407 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
checkpoint found during restore.

2023-08-03 16:34:55,408 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting
job JOBID-WWWWWWWWWW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE
USED TO LAUNCH WITH]



I see some binary-looking HA files in HDFS that seem to have references to
the correct, latest checkpoint rather than the initial one.

Does anyone have an idea as to what could be causing the recovery to use
the initial checkpoint?

Many thanks
Fil

Reply via email to