HeartSaVioR commented on code in PR #50344: URL: https://github.com/apache/spark/pull/50344#discussion_r2015537644
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala: ########## @@ -1112,4 +1113,32 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest ) } } + + test("checkpointFormatVersion2 racing commits don't return incorrect checkpointInfo") { + val sqlConf = new SQLConf() + sqlConf.setConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, 2) + + withTempDir { checkpointDir => + val provider = new CkptIdCollectingStateStoreProviderWrapper() + provider.init( + StateStoreId(checkpointDir.toString, 0, 0), + StateStoreTestsHelper.keySchema, + StateStoreTestsHelper.valueSchema, + PrefixKeyScanStateEncoderSpec(StateStoreTestsHelper.keySchema, 1), + useColumnFamilies = false, + new StateStoreConf(sqlConf), + new Configuration + ) + + val store1 = provider.getStore(0) + val store1NewVersion = store1.commit() + val store2 = provider.getStore(1) + val store2NewVersion = store2.commit() + val store1CheckpointInfo = store1.getStateStoreCheckpointInfo() Review Comment: So without the fix, store1CheckpointInfo and store2CheckpointInfo are the same which is not expected. Looks OK to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org