Hello, Actually I'm studying metadata checkpoint implementation in Spark Streaming and I was wondering the purpose of so called "backup files":
CheckpointWriter snippet: > // We will do checkpoint when generating a batch and completing a batch. > When the processing > // time of a batch is greater than the batch interval, checkpointing for > completing an old > // batch may run after checkpointing of a new batch. If this happens, > checkpoint of an old > // batch actually has the latest information, so we want to recovery from > it. Therefore, we > // also use the latest checkpoint time as the file name, so that we can > recover from the > // latest checkpoint file. > // > // Note: there is only one thread writing the checkpoint files, so we > don't need to worry > // about thread-safety. > val checkpointFile = Checkpoint.checkpointFile(checkpointDir, > latestCheckpointTime) > val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, > latestCheckpointTime) > > // ... some lines further > // If the checkpoint file exists, back it up > // If the backup exists as well, just delete it, otherwise rename will fail > if (fs.exists(checkpointFile)) { > fs.delete(backupFile, true) // just in case it exists > if (!fs.rename(checkpointFile, backupFile)) { > logWarning(s"Could not rename $checkpointFile to $backupFile") > } > } > What is the role of this *backupFile* ? I understand that they are generated if checkpoint file for given timestamp already exists. But how it could be produced ? Is it a protection against checkpointing of different Spark applications to the same directory ? Or it's adapted to case described above (old batch terminated after new batch start) ? Best regards, Bartosz.