gnanda commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2873862825
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2995,6 +2995,63 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
+ test("RocksDB: fileChecksumThreadPoolSize propagates to
ChecksumCheckpointFileManager") {
+ withTempDir { dir =>
+ val sqlConf = SQLConf.get.clone()
+ sqlConf.setConfString(
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true")
+ sqlConf.setConfString(
+ SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "6")
+
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ assert(dbConf.fileChecksumThreadPoolSize === 6)
+
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir, conf = dbConf) { db =>
+ // Access the fm (lazy val) on fileManager via PrivateMethodTester
+ val fileManagerMethod =
PrivateMethod[CheckpointFileManager](Symbol("fm"))
+ val fm = db.fileManager invokePrivate fileManagerMethod()
+
+ // Verify it's a ChecksumCheckpointFileManager with the right thread
count
+ assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
+ assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === 6)
+ }
+ }
+ }
+
+ test("RocksDB: fileChecksumThreadPoolSize of 1 is valid and propagates
correctly") {
+ withTempDir { dir =>
+ val sqlConf = SQLConf.get.clone()
+ sqlConf.setConfString(
+ SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true")
+ sqlConf.setConfString(
+ SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "1")
Review Comment:
done
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -152,11 +152,10 @@ class RocksDB(
private val workingDir = createTempDir("workingDir")
- // We need 2 threads per fm caller to avoid blocking
- // (one for main file and another for checksum file).
- // Since this fm is used by both query task and maintenance thread,
- // then we need 2 * 2 = 4 threads.
- protected val fileChecksumThreadPoolSize: Option[Int] = Some(4)
+ // To avoid blocking, we need 2 threads per fm caller (one for main file,
one for checksum file).
+ // Since this fm is used by both query task and maintenance thread, the
recommended default is
+ // 2 * 2 = 4 threads. A value of 1 disables concurrency (sequential
execution).
+ protected val fileChecksumThreadPoolSize: Option[Int] =
Some(conf.fileChecksumThreadPoolSize)
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]