micheal-o commented on code in PR #54529:
URL: https://github.com/apache/spark/pull/54529#discussion_r2875610836


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3695,6 +3684,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE =
+    buildConf("spark.sql.streaming.stateStore.fileChecksumThreadPoolSize")
+      .internal()
+      .doc("Number of threads used to compute file checksums concurrently when 
uploading " +
+        "state store checkpoints (e.g. main file and checksum file). " +

Review Comment:
   nit: Number of threads used to read/write files and their corresponding 
checksum files concurrently



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
     val numThreads: Int,
     val skipCreationIfFileMissingChecksum: Boolean)
   extends CheckpointFileManager with Logging {
-  assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 
for the main file" +
-    "and another for the checksum file")
+  assert(numThreads >= 0, "numThreads must be a non-negative integer")
 
   import ChecksumCheckpointFileManager._
 
-  // This allows us to concurrently read/write the main file and checksum file
-  private val threadPool = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread"))
+  // Thread pool for concurrent execution, or None when numThreads == 0 
(sequential/inline mode).
+  private val threadPool: Option[ExecutionContextExecutorService] =

Review Comment:
   nit: rename to `threadPoolOpt` to make it obvious



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
     val numThreads: Int,
     val skipCreationIfFileMissingChecksum: Boolean)
   extends CheckpointFileManager with Logging {
-  assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 
for the main file" +
-    "and another for the checksum file")
+  assert(numThreads >= 0, "numThreads must be a non-negative integer")
 
   import ChecksumCheckpointFileManager._
 
-  // This allows us to concurrently read/write the main file and checksum file
-  private val threadPool = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread"))
+  // Thread pool for concurrent execution, or None when numThreads == 0 
(sequential/inline mode).

Review Comment:
   please lets also retain the original comment here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
     val numThreads: Int,
     val skipCreationIfFileMissingChecksum: Boolean)
   extends CheckpointFileManager with Logging {
-  assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 
for the main file" +
-    "and another for the checksum file")
+  assert(numThreads >= 0, "numThreads must be a non-negative integer")
 
   import ChecksumCheckpointFileManager._
 
-  // This allows us to concurrently read/write the main file and checksum file
-  private val threadPool = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread"))
+  // Thread pool for concurrent execution, or None when numThreads == 0 
(sequential/inline mode).
+  private val threadPool: Option[ExecutionContextExecutorService] =
+    if (numThreads == 0) None
+    else Some(ExecutionContext.fromExecutorService(
+      ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread")))
+
+  // ExecutionContext to pass to stream classes: uses the thread pool or a 
caller-runs context.
+  private val streamContext: ExecutionContext = threadPool.getOrElse(new 
ExecutionContext {
+    override def execute(runnable: Runnable): Unit = runnable.run()

Review Comment:
   nit: comment: `This will execute the runnable synchronously`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
     val numThreads: Int,
     val skipCreationIfFileMissingChecksum: Boolean)
   extends CheckpointFileManager with Logging {
-  assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 
for the main file" +
-    "and another for the checksum file")
+  assert(numThreads >= 0, "numThreads must be a non-negative integer")
 
   import ChecksumCheckpointFileManager._
 
-  // This allows us to concurrently read/write the main file and checksum file
-  private val threadPool = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread"))
+  // Thread pool for concurrent execution, or None when numThreads == 0 
(sequential/inline mode).
+  private val threadPool: Option[ExecutionContextExecutorService] =
+    if (numThreads == 0) None
+    else Some(ExecutionContext.fromExecutorService(
+      ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread")))
+
+  // ExecutionContext to pass to stream classes: uses the thread pool or a 
caller-runs context.

Review Comment:
   nit: you mean input/outStream? Lets make it clear



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -127,12 +128,14 @@ case class ChecksumFile(path: Path) {
  *                              orphan checksum files. If using this, it is 
your responsibility
  *                              to clean up the potential orphan checksum 
files.
  * @param numThreads This is the number of threads to use for the thread pool, 
for reading/writing
- *                   files. To avoid blocking, if the file manager instance is 
being used by a
- *                   single thread, then you can set this to 2 (one thread for 
main file, another
- *                   for checksum file).
- *                   If file manager is shared by multiple threads, you can 
set it to
- *                   number of threads using file manager * 2.
- *                   Setting this differently can lead to file operation being 
blocked waiting for
+ *                   files. Must be a non-negative integer.
+ *                   Setting this to 0 disables the thread pool and runs all 
operations
+ *                   sequentially on the calling thread (no concurrency).
+ *                   To avoid blocking with a single concurrent caller, set 
this to 2 (one thread

Review Comment:
   > To avoid blocking
   For this remaining statements, lets use the original statement. This new one 
is not very clear.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
     val numThreads: Int,
     val skipCreationIfFileMissingChecksum: Boolean)
   extends CheckpointFileManager with Logging {
-  assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 
for the main file" +
-    "and another for the checksum file")
+  assert(numThreads >= 0, "numThreads must be a non-negative integer")
 
   import ChecksumCheckpointFileManager._
 
-  // This allows us to concurrently read/write the main file and checksum file
-  private val threadPool = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread"))
+  // Thread pool for concurrent execution, or None when numThreads == 0 
(sequential/inline mode).
+  private val threadPool: Option[ExecutionContextExecutorService] =
+    if (numThreads == 0) None
+    else Some(ExecutionContext.fromExecutorService(
+      ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread")))
+
+  // ExecutionContext to pass to stream classes: uses the thread pool or a 
caller-runs context.
+  private val streamContext: ExecutionContext = threadPool.getOrElse(new 
ExecutionContext {

Review Comment:
   why is this named `streamContext`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
     val numThreads: Int,
     val skipCreationIfFileMissingChecksum: Boolean)
   extends CheckpointFileManager with Logging {
-  assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 
for the main file" +
-    "and another for the checksum file")
+  assert(numThreads >= 0, "numThreads must be a non-negative integer")
 
   import ChecksumCheckpointFileManager._
 
-  // This allows us to concurrently read/write the main file and checksum file
-  private val threadPool = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread"))
+  // Thread pool for concurrent execution, or None when numThreads == 0 
(sequential/inline mode).
+  private val threadPool: Option[ExecutionContextExecutorService] =
+    if (numThreads == 0) None
+    else Some(ExecutionContext.fromExecutorService(
+      ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread")))
+
+  // ExecutionContext to pass to stream classes: uses the thread pool or a 
caller-runs context.
+  private val streamContext: ExecutionContext = threadPool.getOrElse(new 
ExecutionContext {
+    override def execute(runnable: Runnable): Unit = runnable.run()
+    override def reportFailure(cause: Throwable): Unit = throw cause
+  })
+
+  /**
+   * Schedules a computation on the thread pool, or runs it directly on the 
calling thread
+   * if numThreads == 0 (sequential mode).
+   */
+  private def scheduleOrRun[T](f: => T): Future[T] = threadPool match {
+    case None =>
+      try Future.successful(f)

Review Comment:
   nit: `try {}`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
     val numThreads: Int,
     val skipCreationIfFileMissingChecksum: Boolean)
   extends CheckpointFileManager with Logging {
-  assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 
for the main file" +
-    "and another for the checksum file")
+  assert(numThreads >= 0, "numThreads must be a non-negative integer")
 
   import ChecksumCheckpointFileManager._
 
-  // This allows us to concurrently read/write the main file and checksum file
-  private val threadPool = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread"))
+  // Thread pool for concurrent execution, or None when numThreads == 0 
(sequential/inline mode).
+  private val threadPool: Option[ExecutionContextExecutorService] =
+    if (numThreads == 0) None
+    else Some(ExecutionContext.fromExecutorService(
+      ThreadUtils.newDaemonFixedThreadPool(numThreads, 
s"${this.getClass.getSimpleName}-Thread")))
+
+  // ExecutionContext to pass to stream classes: uses the thread pool or a 
caller-runs context.
+  private val streamContext: ExecutionContext = threadPool.getOrElse(new 
ExecutionContext {
+    override def execute(runnable: Runnable): Unit = runnable.run()
+    override def reportFailure(cause: Throwable): Unit = throw cause
+  })
+
+  /**
+   * Schedules a computation on the thread pool, or runs it directly on the 
calling thread
+   * if numThreads == 0 (sequential mode).
+   */
+  private def scheduleOrRun[T](f: => T): Future[T] = threadPool match {

Review Comment:
   Why have `scheduleOrRun` and the synchronous Exec Context above? Why not 
only use the sync Exec context. Not sure I get why we need both



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1547,6 +1547,35 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  test("fileChecksumThreadPoolSize propagates to 
ChecksumCheckpointFileManager") {
+    Seq(0, 1, 6).foreach { numThreads =>
+      withTempDir { dir =>
+        val sqlConf = SQLConf.get.clone()
+        
sqlConf.setConfString(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, 
"true")
+        sqlConf.setConfString(
+          SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, 
numThreads.toString)
+
+        val provider = newStoreProvider(
+          opId = Random.nextInt(), partition = 0, dir = dir.getCanonicalPath,
+          sqlConfOpt = Some(sqlConf))
+        val fileManagerMethod = 
PrivateMethod[CheckpointFileManager](Symbol("fm"))
+        val fm = provider invokePrivate fileManagerMethod()
+
+        assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
+        assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === 
numThreads)
+        provider.close()
+      }
+    }
+  }

Review Comment:
   Lets also add test case with threadpool disabled to make sure using the 
state store works i.e. load, write, get, commit. Reload.. etc. You can also do 
concurrent store.commit and maintenance, to make sure they both finish



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala:
##########
@@ -250,6 +250,30 @@ abstract class ChecksumCheckpointFileManagerSuite extends 
CheckpointFileManagerT
       checksumFmWithoutFallback.close()
     }
   }
+
+  test("numThreads = 0 disables thread pool") {
+    withTempHadoopPath { basePath =>
+      val fm = new ChecksumCheckpointFileManager(
+        createNoChecksumManager(basePath),
+        allowConcurrentDelete = true,
+        numThreads = 0,
+        skipCreationIfFileMissingChecksum = false)
+      fm.close()

Review Comment:
   This isn't testing much. We should write and read a file to confirm that 
main and checksum files are written and read



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -533,11 +533,10 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
         mgr,
         // Allowing this for perf, since we do orphan checksum file cleanup in 
maintenance anyway
         allowConcurrentDelete = true,
-        // We need 2 threads per fm caller to avoid blocking
-        // (one for main file and another for checksum file).
-        // Since this fm is used by both query task and maintenance thread,
-        // then we need 2 * 2 = 4 threads.
-        numThreads = 4,
+        // To avoid blocking, we need 2 threads per fm caller (one for main 
file, one for checksum
+        // file). Since this fm is used by both query task and maintenance 
thread, the recommended
+        // default is 2 * 2 = 4 threads. A value of 0 disables the thread pool 
(sequential mode).
+        numThreads = storeConf.fileChecksumThreadPoolSize,

Review Comment:
   lets also do the logging for this too right



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1547,6 +1547,35 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  test("fileChecksumThreadPoolSize propagates to 
ChecksumCheckpointFileManager") {

Review Comment:
   The test cases are currently under StateStoreSuite, if you look at the class 
def, it is for HDFS store only. This should be under `StateStoreSuiteBase` so 
that is runs for rocksdb too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to