HeartSaVioR commented on code in PR #50344:
URL: https://github.com/apache/spark/pull/50344#discussion_r2015537644


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -1112,4 +1113,32 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
       )
     }
   }
+
+  test("checkpointFormatVersion2 racing commits don't return incorrect 
checkpointInfo") {
+    val sqlConf = new SQLConf()
+    sqlConf.setConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, 2)
+
+    withTempDir { checkpointDir =>
+      val provider = new CkptIdCollectingStateStoreProviderWrapper()
+      provider.init(
+        StateStoreId(checkpointDir.toString, 0, 0),
+        StateStoreTestsHelper.keySchema,
+        StateStoreTestsHelper.valueSchema,
+        PrefixKeyScanStateEncoderSpec(StateStoreTestsHelper.keySchema, 1),
+        useColumnFamilies = false,
+        new StateStoreConf(sqlConf),
+        new Configuration
+      )
+
+      val store1 = provider.getStore(0)
+      val store1NewVersion = store1.commit()
+      val store2 = provider.getStore(1)
+      val store2NewVersion = store2.commit()
+      val store1CheckpointInfo = store1.getStateStoreCheckpointInfo()

Review Comment:
   So without the fix, store1CheckpointInfo and store2CheckpointInfo are the 
same which is not expected. Looks OK to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to