Kimahriman commented on code in PR #50612:
URL: https://github.com/apache/spark/pull/50612#discussion_r2050810199


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
     }
   }
 
+  testWithColumnFamilies("SPARK-51823: unload state stores on commit",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    withTempDir { dir =>
+      withSQLConf(
+        (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName),
+        (SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
+        (SQLConf.SHUFFLE_PARTITIONS.key -> "1"),
+        (SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) {
+        val inputData = MemoryStream[Int]
+
+        val query = inputData.toDS().toDF("value")
+          .select($"value")
+          .groupBy($"value")
+          .agg(count("*"))
+          .writeStream
+          .format("console")
+          .outputMode("complete")
+          .start()
+        try {
+          inputData.addData(1, 2)
+          inputData.addData(2, 3)
+          query.processAllAvailable()
+
+          // StateStore should be unloaded, so its tmp dir shouldn't exist
+          for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
+            assert(!file.getName().startsWith("StateStore"))

Review Comment:
   Fixed by adding an `afterEach` to clear the state store. The parent 
`StreamTest` only does an `afterAll` to clear the state store, not sure if that 
one should just be updated to `afterEach` instead



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
     }
   }
 
+  testWithColumnFamilies("SPARK-51823: unload state stores on commit",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    withTempDir { dir =>
+      withSQLConf(
+        (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName),
+        (SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
+        (SQLConf.SHUFFLE_PARTITIONS.key -> "1"),
+        (SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) {
+        val inputData = MemoryStream[Int]
+
+        val query = inputData.toDS().toDF("value")
+          .select($"value")
+          .groupBy($"value")
+          .agg(count("*"))
+          .writeStream
+          .format("console")
+          .outputMode("complete")
+          .start()
+        try {
+          inputData.addData(1, 2)
+          inputData.addData(2, 3)
+          query.processAllAvailable()
+
+          // StateStore should be unloaded, so its tmp dir shouldn't exist
+          for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
+            assert(!file.getName().startsWith("StateStore"))

Review Comment:
   Fixed by adding an `afterEach` to clear the state store in this suite. The 
parent `StreamTest` only does an `afterAll` to clear the state store, not sure 
if that one should just be updated to `afterEach` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to