micheal-o commented on code in PR #50595:
URL: https://github.com/apache/spark/pull/50595#discussion_r2058996598


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1009,14 +1013,45 @@ object StateStore extends Logging {
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
-      providerIdsToUnload.foreach(unload(_))
+      providerIdsToUnload.foreach(id => {
+        loadedProviders.remove(id).foreach( provider => {
+          // Trigger maintenance thread to immediately do maintenance on and 
close the provider.
+          // Doing maintenance first allows us to do maintenance for a 
constantly-moving state
+          // store.
+          logInfo(log"Task thread trigger maintenance on " +
+            log"provider=${MDC(LogKeys.STATE_STORE_PROVIDER, id)}")
+          doMaintenanceOnProvider(id, provider, 
alreadyRemovedFromLoadedProviders = true)
+        })
+      })
       provider
     }
   }
 
-  /** Unload a state store provider */
-  def unload(storeProviderId: StateStoreProviderId): Unit = 
loadedProviders.synchronized {
-    loadedProviders.remove(storeProviderId).foreach(_.close())
+  /**
+   * Unload a state store provider.
+   * If alreadyRemovedFromLoadedProviders is None, provider will be
+   * removed from loadedProviders and closed.
+   * If alreadyRemovedFromLoadedProviders is Some, provider will be closed
+   * using passed in provider.
+   * WARNING: CAN ONLY BE CALLED FROM MAINTENANCE THREAD!
+   */
+  def unload(storeProviderId: StateStoreProviderId,
+             alreadyRemovedStoreFromLoadedProviders: 
Option[StateStoreProvider] = None): Unit = {
+    var toCloseProviders: List[StateStoreProvider] = Nil

Review Comment:
   why is this a list? we will only be closing one provider right?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1157,6 +1160,69 @@ object StateStore extends Logging {
     }
   }
 
+  private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: 
StateStoreProvider,
+    alreadyRemovedFromLoadedProviders: Boolean = false): Unit = {
+    maintenanceThreadPool.execute(() => {
+      val startTime = System.currentTimeMillis()
+      if (alreadyRemovedFromLoadedProviders) {
+        // If provider is already removed from loadedProviders, we MUST process
+        // this partition to close it, so we block until we can.
+        awaitProcessThisPartition(id)

Review Comment:
   also add comment about when this would have already been removed



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1157,6 +1160,69 @@ object StateStore extends Logging {
     }
   }
 
+  private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: 
StateStoreProvider,
+    alreadyRemovedFromLoadedProviders: Boolean = false): Unit = {
+    maintenanceThreadPool.execute(() => {
+      val startTime = System.currentTimeMillis()
+      if (alreadyRemovedFromLoadedProviders) {
+        // If provider is already removed from loadedProviders, we MUST process
+        // this partition to close it, so we block until we can.
+        awaitProcessThisPartition(id)
+      }
+      val awaitingPartitionDuration = System.currentTimeMillis() - startTime
+      try {
+        provider.doMaintenance()
+        // If shouldRemoveFromLoadedProviders is false, we don't need to verify
+        // with the coordinator as we know it definitely should be unloaded.
+        if (alreadyRemovedFromLoadedProviders || 
!verifyIfStoreInstanceActive(id)) {
+          if (alreadyRemovedFromLoadedProviders) {
+            unload(id, Some(provider))
+          } else {
+            unload(id)
+          }
+          logInfo(log"Unloaded ${MDC(LogKeys.STATE_STORE_PROVIDER, provider)}")
+        }
+      } catch {
+        case NonFatal(e) =>
+          logWarning(log"Error managing ${MDC(LogKeys.STATE_STORE_PROVIDER, 
provider)}, " +

Review Comment:
   nit: replace the word "managing"?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1009,14 +1013,45 @@ object StateStore extends Logging {
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
-      providerIdsToUnload.foreach(unload(_))
+      providerIdsToUnload.foreach(id => {
+        loadedProviders.remove(id).foreach( provider => {
+          // Trigger maintenance thread to immediately do maintenance on and 
close the provider.
+          // Doing maintenance first allows us to do maintenance for a 
constantly-moving state
+          // store.
+          logInfo(log"Task thread trigger maintenance on " +

Review Comment:
   include `trigger maintenance to close provider`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1157,6 +1160,69 @@ object StateStore extends Logging {
     }
   }
 
+  private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: 
StateStoreProvider,
+    alreadyRemovedFromLoadedProviders: Boolean = false): Unit = {
+    maintenanceThreadPool.execute(() => {
+      val startTime = System.currentTimeMillis()
+      if (alreadyRemovedFromLoadedProviders) {
+        // If provider is already removed from loadedProviders, we MUST process
+        // this partition to close it, so we block until we can.
+        awaitProcessThisPartition(id)

Review Comment:
   lets make it clear that we are waiting for any possible ongoing maintenance 
on this partition.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1009,14 +1013,45 @@ object StateStore extends Logging {
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
-      providerIdsToUnload.foreach(unload(_))
+      providerIdsToUnload.foreach(id => {
+        loadedProviders.remove(id).foreach( provider => {
+          // Trigger maintenance thread to immediately do maintenance on and 
close the provider.
+          // Doing maintenance first allows us to do maintenance for a 
constantly-moving state
+          // store.
+          logInfo(log"Task thread trigger maintenance on " +

Review Comment:
   +1, that would help with investigation



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1009,14 +1013,45 @@ object StateStore extends Logging {
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
-      providerIdsToUnload.foreach(unload(_))
+      providerIdsToUnload.foreach(id => {
+        loadedProviders.remove(id).foreach( provider => {
+          // Trigger maintenance thread to immediately do maintenance on and 
close the provider.
+          // Doing maintenance first allows us to do maintenance for a 
constantly-moving state
+          // store.
+          logInfo(log"Task thread trigger maintenance on " +
+            log"provider=${MDC(LogKeys.STATE_STORE_PROVIDER, id)}")
+          doMaintenanceOnProvider(id, provider, 
alreadyRemovedFromLoadedProviders = true)

Review Comment:
   Lets rename this to `submitMaintenanceWorkForProvider`. To make it clear 
that this isn't blocking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to