anishshri-db commented on code in PR #50595:
URL: https://github.com/apache/spark/pull/50595#discussion_r2060583685


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1009,14 +1013,46 @@ object StateStore extends Logging {
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
-      providerIdsToUnload.foreach(unload(_))
+      val taskContextIdLogLine = Option(TaskContext.get()).map { tc =>
+        log"taskId=${MDC(LogKeys.TASK_ID, tc.taskAttemptId())}"
+      }.getOrElse(log"")
+
+      providerIdsToUnload.foreach(id => {
+        loadedProviders.remove(id).foreach( provider => {
+          // Trigger maintenance thread to immediately do maintenance on and 
close the provider.
+          // Doing maintenance first allows us to do maintenance for a 
constantly-moving state
+          // store.
+          logInfo(log"Task thread trigger maintenance to close " +

Review Comment:
   nit: `Submitted maintenance from task thread`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1157,6 +1161,81 @@ object StateStore extends Logging {
     }
   }
 
+  /**
+   * Submits maintenance work for a provider to the maintenance thread pool.
+   *
+   * @param id The StateStore provider ID to perform maintenance on
+   * @param provider The StateStore provider instance
+   * @param submittedFromTaskThread If true, provider was already removed from
+   *                                loadedProviders. If false, we must already
+   *                                have acquired the lock to process this 
partition.
+   */
+  private def submitMaintenanceWorkForProvider(
+      id: StateStoreProviderId,
+      provider: StateStoreProvider,
+      submittedFromTaskThread: Boolean = false): Unit = {
+    maintenanceThreadPool.execute(() => {
+      val startTime = System.currentTimeMillis()
+      if (submittedFromTaskThread) {
+        // If provider is already removed from loadedProviders (which can 
happen when a task thread
+        // triggers unloading of an old provider), we MUST process this 
partition to
+        // close it properly.
+        // We block here until we can acquire the lock for this partition, 
waiting for any
+        // possible ongoing maintenance on this partition to complete first.
+        awaitProcessThisPartition(id)
+      }
+      val awaitingPartitionDuration = System.currentTimeMillis() - startTime
+      try {
+        provider.doMaintenance()
+        // If submittedFromTaskThread is false, we don't need to verify
+        // with the coordinator as we know it definitely should be unloaded.
+        if (submittedFromTaskThread) {
+            unload(id, Some(provider))
+          } else if (!verifyIfStoreInstanceActive(id)) {

Review Comment:
   nit: indent seems off here ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1095,6 +1131,14 @@ object StateStore extends Logging {
     }
   }
 
+  // Block until we can process this partition
+  private def awaitProcessThisPartition(id: StateStoreProviderId): Unit =
+    maintenanceThreadPoolLock.synchronized {
+      while (!processThisPartition(id)) {
+        maintenanceThreadPoolLock.wait()

Review Comment:
   Should we wait on some timeout here instead of waiting indefinitely ? also - 
overall - should be give up waiting for the partition to process if its not 
making progress on some upper bound ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to