Hi community,

We are currently using* Externalized Checkpoints* to prevent abrupt YARN
application failures, as it saves a "_metadata" file within the checkpoint
folder which is essential for the job's cold recovery.

As it is designed in Flink, the completed checkpoint paths are like
*hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1895*,
*hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1896*,
*hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/chk-1897* ...
and so on.

Originally, we have deployed a tool to periodically request the REST API of
all running Flink jobs to get the latest completed checkpoint paths, and
save them on database for later use. However. the periodic scan frequency
might be lower than the pace that checkpoints are deleted, thus in case of
recovery, the saved directory might have already been deleted and replaced
by new ones.

Here we wonder that if we could just try to list the parent checkpoint
folder (say *hdfs:///flink-checkpoints/cfbb875eeabcd46baecf5124f0c33047/*)
and choose the "chk-XXXX" directory with the highest XXXX number as the
checkpoint for job recovery.

But there is one concern to address: how can we make sure that the highest
"chk-XXXX" folder is indeed a complete one (after digging through the code
in *FsCheckpointMetadataOutputStream *class, there might be a chance that
the file is halfway uploaded but later failed to be removed due to
exceptions or sudden JVM crashes).

Another approach that we come up with is to write a callback to notify us
once a checkpoint is completed via CheckpointListeners like
*AcknowledgeOnCheckpoint*, however, it is also possible that the
notification message never reaches the server before JVM crashes.

What do you think is the right and idiomatic way to get the last
successfully completed externalized checkpoint path to prevent sudden JVM
crashes? Thank you very much : )

Reply via email to