anishshri-db commented on code in PR #50773: URL: https://github.com/apache/spark/pull/50773#discussion_r2072053570
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ########## @@ -1459,6 +1472,193 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } + testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { + _ => + val storeId = StateStoreId(newDir(), 0L, 1) + var version = 0L + + // Commit to store using file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(version) + put(store, "1", 11, 100) + put(store, "2", 22, 200) + version = store.commit() + } + } + + // Reload the store and commit without file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 1) + val store = provider.getStore(version) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + + put(store, "3", 33, 300) + put(store, "4", 44, 400) + version = store.commit() + } + } + + // Reload the store and commit with file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 2) + val store = provider.getStore(version) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + assert(get(store, "3", 33) === Some(300)) + assert(get(store, "4", 44) === Some(400)) + + put(store, "5", 55, 500) + version = store.commit() + } + } + } + + test("checksum files are also cleaned up during maintenance") { + val storeId = StateStoreId(newDir(), 0L, 1) + val numBatches = 6 + val minDeltas = 2 + // Adding 1 to ensure snapshot is uploaded. + // Snapshot upload might happen at minDeltas or minDeltas + 1, depending on the provider + val maintFrequency = minDeltas + 1 + var version = 0L + + withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1", + // So that RocksDB will also generate changelog files + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> + true.toString) { + + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + (version + 1 to numBatches).foreach { i => + version = putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0) + } + + // This is because, hdfs and rocksdb old files detection logic is different + provider match { + case _: HDFSBackedStateStoreProvider => + // For HDFS State store, files left: + // 3.delta to 6.delta (+ checksum file) + // 3.snapshot (+ checksum file), 6.snapshot (+ checksum file) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 12, expectedNumChecksumFiles = 6) + case _ => + // For RocksDB State store, files left: + // 6.changelog (+ checksum file), 6.zip (+ checksum file) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 4, expectedNumChecksumFiles = 2) + } + } + + // turn off file checksum, and verify that the previously created checksum files + // will be deleted by maintenance + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { Review Comment: Yea - but to be safe - we usually also confirm against released versions. so you can generate the golden files for that version and run your tests against that path (you can check the usage of this dir for eg - https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala#L1188 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org