micheal-o commented on code in PR #50773:
URL: https://github.com/apache/spark/pull/50773#discussion_r2072005201


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -661,6 +661,55 @@ object StateStoreProvider extends Logging {
     }
   }
 
+  /**
+   * If file checksum is enabled, then when we delete store files, using 
fm.delete,
+   * then the corresponding checksum file would be deleted too. But there are 
situations
+   * where we can have orphan checksum files (i.e. the main file no longer 
exist),
+   * so we need to delete them too:
+   *   1. If the file was created when file checksum was enabled, but then 
file checksum
+   *      was later disabled. In this case, fm.delete will not delete the 
checksum file.
+   *      Since we use a specific fm (checksum file manager) when checksum is 
enabled.
+   *   2. We enable concurrent file deletion for the checksum file manager,
+   *      for perf improvement, hence there is a slight chance deleting the 
main file was
+   *      successful but the checksum file deletion failed. Hence, we need to 
clean it up by
+   *      ourselves. If we don't want this behavior we can turn off concurrent 
deletion
+   *      for the checksum file manager
+   * */
+  private[streaming] def deleteOrphanChecksumFiles(
+      fm: CheckpointFileManager,
+      checksumFiles: Seq[ChecksumFile],
+      deletedStoreFiles: Seq[Path],
+      minVersionToRetain: Long,
+      fileChecksumEnabled: Boolean): Unit = {
+    // Use file name instead since path format might be different
+    // i.e. checksum files might have scheme prefix and the deleted files 
might not.
+    val deletedStoreFilesSet = deletedStoreFiles.map(_.getName).toSet
+    val oldChecksumFiles = checksumFiles
+      .filter(f => f.baseName.split("_")(0).toLong < minVersionToRetain)
+
+    val orphanChecksumFiles = if (fileChecksumEnabled) {
+      oldChecksumFiles
+        // old checksum files and was not part of the store files deleted
+        .filterNot(f => deletedStoreFilesSet.contains(f.mainFilePath.getName))
+    } else {
+      // all old checksum files, in case file checksum was previously enabled
+      oldChecksumFiles
+    }
+
+    val (_, dur) = Utils.timeTakenMs {
+      orphanChecksumFiles.foreach { f =>
+        fm.delete(f.path)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to