Hi, Fil

If we don't specify the ExternalizedCheckpointCleanup, the default
checkpoint retention strategy is never retaining after the job terminates,
which causes your issue. So I think your configure the
ExternalizedCheckpointCleanup
to `RETAIN_ON_CANCELLING`.

Best,
Ron

Filip Karnicki <filip.karni...@gmail.com> 于2023年8月4日周五 16:24写道:

> update: Our current line of thinking is that need this to be set in order
> to the checkpoint to live across job manager failures
>
>    -
>
>    ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the
>    checkpoint when the job is cancelled. The checkpoint state will only be
>    available if the job fails.
>
>
> On Fri, 4 Aug 2023 at 05:03, Filip Karnicki <filip.karni...@gmail.com>
> wrote:
>
>> Hi, we recently went live with a job on a shared cluster, which is
>> managed with Yarn
>>
>> The job was started using
>>
>> flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE
>>
>>
>> Everything worked fine for a few days, but then the job needed to be
>> restored for whatever reason
>>
>> 2023-08-03 16:34:44,525 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
>> Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit
>> code 2.
>>
>> org.apache.flink.util.FlinkException: Cannot deregister application.
>> Resource manager service is not available.
>>
>>
>> It seems that while restoring (yarn 'attempt' 02), Flink used the
>> original checkpoint we provided as the value of the -s parameter, and not
>> the most recent checkpoint for that job. This caused a few days worth of
>> data to be reprocessed.
>>
>>
>> 2023-08-03 16:34:55,259 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
>> - Recovering checkpoints from
>> ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}.
>>
>> 2023-08-03 16:34:55,262 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
>> - Found 0 checkpoints in
>> ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}.
>>
>> 2023-08-03 16:34:55,262 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
>> - Trying to fetch 0 checkpoints from storage.
>>
>> 2023-08-03 16:34:55,262 INFO
>> org.apache.flink.runtime.util.ZooKeeperUtils                 [] -
>> Initialized DefaultCompletedCheckpointStore in
>> 'ZooKeeperStateHandleStore{namespace='flink/application_zzzzzzzzzzzz/jobs/JOBID-WWWWWWWWWW/checkpoints'}'
>> with /checkpoints.
>>
>> 2023-08-03 16:34:55,293 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running
>> initialization on master for yyyyyyyy (JOBID-WWWWWWWWWW).
>>
>> 2023-08-03 16:34:55,293 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>> Successfully ran initialization on master in 0 ms.
>>
>> 2023-08-03 16:34:55,313 INFO
>> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
>> Built 1 pipelined regions in 0 ms
>>
>> 2023-08-03 16:34:55,347 INFO
>> org.apache.flink.yarn.YarnResourceManagerDriver              [] - Recovered
>> 0 containers from previous attempts ([]).
>>
>> 2023-08-03 16:34:55,347 INFO
>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>> Recovered 0 workers from previous attempt.
>>
>> 2023-08-03 16:34:55,369 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
>> job/cluster config to configure application-defined state backend:
>> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8,
>> writeBatchSize=2097152}
>>
>> 2023-08-03 16:34:55,369 INFO
>> org.apache.hadoop.conf.Configuration                         [] -
>> resource-types.xml not found
>>
>> 2023-08-03 16:34:55,370 INFO
>> org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to
>> find 'resource-types.xml'.
>>
>> 2023-08-03 16:34:55,371 INFO
>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
>> Using predefined options: DEFAULT.
>>
>> 2023-08-03 16:34:55,371 INFO
>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
>> Using application-defined options factory:
>> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}.
>>
>> 2023-08-03 16:34:55,371 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
>> application-defined state backend: EmbeddedRocksDBStateBackend{,
>> localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
>> numberOfTransferThreads=8, writeBatchSize=2097152}
>>
>> 2023-08-03 16:34:55,371 INFO
>> org.apache.flink.runtime.state.StateBackendLoader            [] - State
>> backend loader loads the state backend as EmbeddedRocksDBStateBackend
>>
>> 2023-08-03 16:34:55,375 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>> Checkpoint storage is set to 'filesystem': (checkpoints
>> &quot;hdfs:/apps/xxxx/flink/checkpoints/yyyyyy-job&quot;)
>>
>> 2023-08-03 16:34:55,377 INFO
>> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
>> Enabled external resources: []
>>
>> 2023-08-03 16:34:55,390 INFO
>> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
>> bound of the thread pool size is 500
>>
>> 2023-08-03 16:34:55,407 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
>> checkpoint found during restore.
>>
>> 2023-08-03 16:34:55,408 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting
>> job JOBID-WWWWWWWWWW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE
>> USED TO LAUNCH WITH]
>>
>>
>>
>> I see some binary-looking HA files in HDFS that seem to have references
>> to the correct, latest checkpoint rather than the initial one.
>>
>> Does anyone have an idea as to what could be causing the recovery to use
>> the initial checkpoint?
>>
>> Many thanks
>> Fil
>>
>

Reply via email to