anishshri-db commented on code in PR #50595:
URL: https://github.com/apache/spark/pull/50595#discussion_r2067256561


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1111,60 +1159,57 @@ object StateStore extends Logging {
     }
   }
 
+  // Wait until this partition can be processed
+  private def awaitProcessThisPartition(
+      id: StateStoreProviderId,
+      timeoutMs: Long
+  ): Boolean = maintenanceThreadPoolLock synchronized  {
+    val endTime = System.currentTimeMillis() + timeoutMs
+
+    // If immediate processing fails, wait with timeout
+    var canProcessThisPartition = processThisPartition(id)
+    while (!canProcessThisPartition && System.currentTimeMillis() < endTime) {
+      canProcessThisPartition = processThisPartition(id)
+      maintenanceThreadPoolLock.wait(timeoutMs)
+    }
+
+    canProcessThisPartition
+  }
+
+  private def doMaintenance(): Unit = doMaintenance(StateStoreConf.empty)
+
   /**
    * Execute background maintenance task in all the loaded store providers if 
they are still
    * the active instances according to the coordinator.
    */
-  private def doMaintenance(): Unit = {
+  private def doMaintenance(storeConf: StateStoreConf): Unit = {
     logDebug("Doing maintenance")
     if (SparkEnv.get == null) {
       throw new IllegalStateException("SparkEnv not active, cannot do 
maintenance on StateStores")
     }
+
+    // Providers that couldn't be processed now and need to be added back to 
the queue
+    val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, 
StateStoreProvider)]()
+
+    while (!unloadedProvidersToClose.isEmpty) {

Review Comment:
   Access to `unloadedProvidersToClose` is not truly thread safe here. Can we 
add a separate lock to guard access ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to