micheal-o commented on code in PR #50773: URL: https://github.com/apache/spark/pull/50773#discussion_r2072005201
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -661,6 +661,55 @@ object StateStoreProvider extends Logging { } } + /** + * If file checksum is enabled, then when we delete store files, using fm.delete, + * then the corresponding checksum file would be deleted too. But there are situations + * where we can have orphan checksum files (i.e. the main file no longer exist), + * so we need to delete them too: + * 1. If the file was created when file checksum was enabled, but then file checksum + * was later disabled. In this case, fm.delete will not delete the checksum file. + * Since we use a specific fm (checksum file manager) when checksum is enabled. + * 2. We enable concurrent file deletion for the checksum file manager, + * for perf improvement, hence there is a slight chance deleting the main file was + * successful but the checksum file deletion failed. Hence, we need to clean it up by + * ourselves. If we don't want this behavior we can turn off concurrent deletion + * for the checksum file manager + * */ + private[streaming] def deleteOrphanChecksumFiles( + fm: CheckpointFileManager, + checksumFiles: Seq[ChecksumFile], + deletedStoreFiles: Seq[Path], + minVersionToRetain: Long, + fileChecksumEnabled: Boolean): Unit = { + // Use file name instead since path format might be different + // i.e. checksum files might have scheme prefix and the deleted files might not. + val deletedStoreFilesSet = deletedStoreFiles.map(_.getName).toSet + val oldChecksumFiles = checksumFiles + .filter(f => f.baseName.split("_")(0).toLong < minVersionToRetain) + + val orphanChecksumFiles = if (fileChecksumEnabled) { + oldChecksumFiles + // old checksum files and was not part of the store files deleted + .filterNot(f => deletedStoreFilesSet.contains(f.mainFilePath.getName)) + } else { + // all old checksum files, in case file checksum was previously enabled + oldChecksumFiles + } + + val (_, dur) = Utils.timeTakenMs { + orphanChecksumFiles.foreach { f => + fm.delete(f.path) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org