gnanda commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2873862825


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2995,6 +2995,63 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  test("RocksDB: fileChecksumThreadPoolSize propagates to 
ChecksumCheckpointFileManager") {
+    withTempDir { dir =>
+      val sqlConf = SQLConf.get.clone()
+      sqlConf.setConfString(
+        SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true")
+      sqlConf.setConfString(
+        SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "6")
+
+      val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+      assert(dbConf.fileChecksumThreadPoolSize === 6)
+
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = dbConf) { db =>
+        // Access the fm (lazy val) on fileManager via PrivateMethodTester
+        val fileManagerMethod = 
PrivateMethod[CheckpointFileManager](Symbol("fm"))
+        val fm = db.fileManager invokePrivate fileManagerMethod()
+
+        // Verify it's a ChecksumCheckpointFileManager with the right thread 
count
+        assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
+        assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === 6)
+      }
+    }
+  }
+
+  test("RocksDB: fileChecksumThreadPoolSize of 1 is valid and propagates 
correctly") {
+    withTempDir { dir =>
+      val sqlConf = SQLConf.get.clone()
+      sqlConf.setConfString(
+        SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true")
+      sqlConf.setConfString(
+        SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "1")

Review Comment:
   done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -152,11 +152,10 @@ class RocksDB(
 
   private val workingDir = createTempDir("workingDir")
 
-  // We need 2 threads per fm caller to avoid blocking
-  // (one for main file and another for checksum file).
-  // Since this fm is used by both query task and maintenance thread,
-  // then we need 2 * 2 = 4 threads.
-  protected val fileChecksumThreadPoolSize: Option[Int] = Some(4)
+  // To avoid blocking, we need 2 threads per fm caller (one for main file, 
one for checksum file).
+  // Since this fm is used by both query task and maintenance thread, the 
recommended default is
+  // 2 * 2 = 4 threads. A value of 1 disables concurrency (sequential 
execution).
+  protected val fileChecksumThreadPoolSize: Option[Int] = 
Some(conf.fileChecksumThreadPoolSize)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to