Kimahriman commented on code in PR #50612: URL: https://github.com/apache/spark/pull/50612#discussion_r2050810199
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala: ########## @@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest } } + testWithColumnFamilies("SPARK-51823: unload state stores on commit", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + withTempDir { dir => + withSQLConf( + (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName), + (SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath), + (SQLConf.SHUFFLE_PARTITIONS.key -> "1"), + (SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) { + val inputData = MemoryStream[Int] + + val query = inputData.toDS().toDF("value") + .select($"value") + .groupBy($"value") + .agg(count("*")) + .writeStream + .format("console") + .outputMode("complete") + .start() + try { + inputData.addData(1, 2) + inputData.addData(2, 3) + query.processAllAvailable() + + // StateStore should be unloaded, so its tmp dir shouldn't exist + for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) { + assert(!file.getName().startsWith("StateStore")) Review Comment: Fixed by adding an `afterEach` to clear the state store. The parent `StreamTest` only does an `afterAll` to clear the state store, not sure if that one should just be updated to `afterEach` instead ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala: ########## @@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest } } + testWithColumnFamilies("SPARK-51823: unload state stores on commit", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + withTempDir { dir => + withSQLConf( + (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName), + (SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath), + (SQLConf.SHUFFLE_PARTITIONS.key -> "1"), + (SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) { + val inputData = MemoryStream[Int] + + val query = inputData.toDS().toDF("value") + .select($"value") + .groupBy($"value") + .agg(count("*")) + .writeStream + .format("console") + .outputMode("complete") + .start() + try { + inputData.addData(1, 2) + inputData.addData(2, 3) + query.processAllAvailable() + + // StateStore should be unloaded, so its tmp dir shouldn't exist + for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) { + assert(!file.getName().startsWith("StateStore")) Review Comment: Fixed by adding an `afterEach` to clear the state store in this suite. The parent `StreamTest` only does an `afterAll` to clear the state store, not sure if that one should just be updated to `afterEach` instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org