micheal-o commented on code in PR #50773: URL: https://github.com/apache/spark/pull/50773#discussion_r2072032193
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ########## @@ -1459,6 +1472,193 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } + testWithAllCodec("file checksum can be enabled and disabled for the same checkpoint") { + _ => + val storeId = StateStoreId(newDir(), 0L, 1) + var version = 0L + + // Commit to store using file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(version) + put(store, "1", 11, 100) + put(store, "2", 22, 200) + version = store.commit() + } + } + + // Reload the store and commit without file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 1) + val store = provider.getStore(version) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + + put(store, "3", 33, 300) + put(store, "4", 44, 400) + version = store.commit() + } + } + + // Reload the store and commit with file checksum + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + assert(version == 2) + val store = provider.getStore(version) + assert(get(store, "1", 11) === Some(100)) + assert(get(store, "2", 22) === Some(200)) + assert(get(store, "3", 33) === Some(300)) + assert(get(store, "4", 44) === Some(400)) + + put(store, "5", 55, 500) + version = store.commit() + } + } + } + + test("checksum files are also cleaned up during maintenance") { + val storeId = StateStoreId(newDir(), 0L, 1) + val numBatches = 6 + val minDeltas = 2 + // Adding 1 to ensure snapshot is uploaded. + // Snapshot upload might happen at minDeltas or minDeltas + 1, depending on the provider + val maintFrequency = minDeltas + 1 + var version = 0L + + withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1", + // So that RocksDB will also generate changelog files + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> + true.toString) { + + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + (version + 1 to numBatches).foreach { i => + version = putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0) + } + + // This is because, hdfs and rocksdb old files detection logic is different + provider match { + case _: HDFSBackedStateStoreProvider => + // For HDFS State store, files left: + // 3.delta to 6.delta (+ checksum file) + // 3.snapshot (+ checksum file), 6.snapshot (+ checksum file) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 12, expectedNumChecksumFiles = 6) + case _ => + // For RocksDB State store, files left: + // 6.changelog (+ checksum file), 6.zip (+ checksum file) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 4, expectedNumChecksumFiles = 2) + } + } + + // turn off file checksum, and verify that the previously created checksum files + // will be deleted by maintenance + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> false.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + (version + 1 to version + numBatches).foreach { i => + version = putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = i % maintFrequency == 0) + } + + // now verify no checksum files are left + // This is because, hdfs and rocksdb old files detection logic is different + provider match { + case _: HDFSBackedStateStoreProvider => + // For HDFS State store, files left: + // 6.delta, 9.delta to 12.delta + // 9.snapshot, 12.snapshot + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 7, expectedNumChecksumFiles = 0) + case _ => + // For RocksDB State store, files left: + // 12.changelog, 12.zip + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 2, expectedNumChecksumFiles = 0) + } + } + } + } + } + + testWithAllCodec("overwrite state file without overwriting checksum file") { _ => + val storeId = StateStoreId(newDir(), 0L, 1) + val numBatches = 3 + val minDeltas = 2 + + withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> minDeltas.toString, + // So that RocksDB will also generate changelog files + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> + true.toString) { + + // First run with file checksum enabled. It will generate state and checksum files. + // Turn off file checksum, and regenerate only the state files + Seq(true, false).foreach { fileChecksumEnabled => + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> fileChecksumEnabled.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + (1 to numBatches).foreach { i => + putAndCommitStore( + provider, loadVersion = i - 1, doMaintenance = false) + } + + // This should only create snapshot and no delete + provider.doMaintenance() + + // number of files should be the same. + // 3 delta/changelog files, 1 snapshot (with checksum files) + verifyChecksumFiles(storeId.storeCheckpointLocation().toString, + expectedNumFiles = 8, expectedNumChecksumFiles = 4) + } + } + } + + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> true.toString) { + // now try to load the store with checksum enabled. + // It will verify the overwritten state files with the checksum files. + (1 to numBatches).foreach { i => + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + // load from DFS should be successful + val store = provider.getStore(i) + store.abort() + } + } + } + } + } + + private def verifyChecksumFiles( Review Comment: yeah this covers all the file types we are enabling for. This runs for HDFS and RocksDB state store with all the different compression types. For RocksDB also runs for avro too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org