anishshri-db commented on code in PR #50773:
URL: https://github.com/apache/spark/pull/50773#discussion_r2072053570


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1459,6 +1472,193 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     }
   }
 
+  testWithAllCodec("file checksum can be enabled and disabled for the same 
checkpoint") {
+    _ =>
+      val storeId = StateStoreId(newDir(), 0L, 1)
+      var version = 0L
+
+      // Commit to store using file checksum
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> 
true.toString) {
+        tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { 
provider =>
+          val store = provider.getStore(version)
+          put(store, "1", 11, 100)
+          put(store, "2", 22, 200)
+          version = store.commit()
+        }
+      }
+
+      // Reload the store and commit without file checksum
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> 
false.toString) {
+        tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { 
provider =>
+          assert(version == 1)
+          val store = provider.getStore(version)
+          assert(get(store, "1", 11) === Some(100))
+          assert(get(store, "2", 22) === Some(200))
+
+          put(store, "3", 33, 300)
+          put(store, "4", 44, 400)
+          version = store.commit()
+        }
+      }
+
+      // Reload the store and commit with file checksum
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> 
true.toString) {
+        tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { 
provider =>
+          assert(version == 2)
+          val store = provider.getStore(version)
+          assert(get(store, "1", 11) === Some(100))
+          assert(get(store, "2", 22) === Some(200))
+          assert(get(store, "3", 33) === Some(300))
+          assert(get(store, "4", 44) === Some(400))
+
+          put(store, "5", 55, 500)
+          version = store.commit()
+        }
+      }
+  }
+
+  test("checksum files are also cleaned up during maintenance") {
+    val storeId = StateStoreId(newDir(), 0L, 1)
+    val numBatches = 6
+    val minDeltas = 2
+    // Adding 1 to ensure snapshot is uploaded.
+    // Snapshot upload might happen at minDeltas or minDeltas + 1, depending 
on the provider
+    val maintFrequency = minDeltas + 1
+    var version = 0L
+
+    withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> 
minDeltas.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1",
+      // So that RocksDB will also generate changelog files
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" ->
+        true.toString) {
+
+      tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { 
provider =>
+        (version + 1 to numBatches).foreach { i =>
+          version = putAndCommitStore(
+            provider, loadVersion = i - 1, doMaintenance = i % maintFrequency 
== 0)
+        }
+
+        // This is because, hdfs and rocksdb old files detection logic is 
different
+        provider match {
+          case _: HDFSBackedStateStoreProvider =>
+            // For HDFS State store, files left:
+            // 3.delta to 6.delta (+ checksum file)
+            // 3.snapshot (+ checksum file), 6.snapshot (+ checksum file)
+            verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
+              expectedNumFiles = 12, expectedNumChecksumFiles = 6)
+          case _ =>
+            // For RocksDB State store, files left:
+            // 6.changelog (+ checksum file), 6.zip (+ checksum file)
+            verifyChecksumFiles(storeId.storeCheckpointLocation().toString,
+              expectedNumFiles = 4, expectedNumChecksumFiles = 2)
+        }
+      }
+
+      // turn off file checksum, and verify that the previously created 
checksum files
+      // will be deleted by maintenance
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> 
false.toString) {

Review Comment:
   Yea - but to be safe - we usually also confirm against released versions. so 
you can generate the golden files for that version and run your tests against 
that path (you can check the usage of this dir for eg - 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala#L1188
 )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to