liviazhu-db commented on code in PR #50595:
URL: https://github.com/apache/spark/pull/50595#discussion_r2065151814


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1030,14 +1052,41 @@ object StateStore extends Logging {
     }
   }
 
-  /** Unload a state store provider */
-  def unload(storeProviderId: StateStoreProviderId): Unit = 
loadedProviders.synchronized {
-    loadedProviders.remove(storeProviderId).foreach(_.close())
+  /**
+   * Unload a state store provider.
+   * If alreadyRemovedFromLoadedProviders is None, provider will be
+   * removed from loadedProviders and closed.
+   * If alreadyRemovedFromLoadedProviders is Some, provider will be closed
+   * using passed in provider.
+   * WARNING: CAN ONLY BE CALLED FROM MAINTENANCE THREAD!
+   */
+  def removeFromLoadedProvidersAndClose(
+      storeProviderId: StateStoreProviderId,
+      alreadyRemovedProvider: Option[StateStoreProvider] = None
+  ): Unit = {
+    // Get the provider to close - either the one passed in or one we remove 
from loadedProviders
+    val providerToClose = alreadyRemovedProvider.orElse {
+      loadedProviders.synchronized {
+        loadedProviders.remove(storeProviderId)
+      }
+    }
+
+    // Close the provider if we found one
+    providerToClose.foreach(_.close())
+  }
+
+  private def closeProvider(
+      storeProviderId: StateStoreProviderId,
+      removedProvider: Option[StateStoreProvider]): Unit = {
+    loadedProviders.synchronized {
+      assert(!loadedProviders.contains(storeProviderId))

Review Comment:
   I don't think you can assert this here. What could happen is that the task 
removes the provider from loadedProviders, but by the time this code runs 
another task reloads a new provider (same providerId) to loadedProviders. Why 
do we have this fn?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1173,6 +1208,94 @@ object StateStore extends Logging {
     }
   }
 
+  /**
+   * Submits maintenance work for a provider to the maintenance thread pool.
+   *
+   * @param id The StateStore provider ID to perform maintenance on
+   * @param provider The StateStore provider instance
+   * @param alreadyRemovedFromLoadedProviders If true, provider was already 
removed from
+   *                                loadedProviders. If false, we must already
+   *                                have acquired the lock to process this 
partition.
+   */
+  private def submitMaintenanceWorkForProvider(
+      id: StateStoreProviderId,
+      provider: StateStoreProvider,
+      storeConf: StateStoreConf,
+      alreadyRemovedFromLoadedProviders: Boolean = false): Unit = {
+    maintenanceThreadPool.execute(() => {
+      val startTime = System.currentTimeMillis()
+      val canProcessThisPartition = if (alreadyRemovedFromLoadedProviders) {

Review Comment:
   Ah, so we actually need to differentiate providers submitted from the task 
thread, from the queue, and from loadedProviders.
   * Providers submitted from the task thread need to call 
`awaitProcessThisPartition` and `closeProvider(id, Some(provider))`.
   * Providers submitted from the queue CANNOT call `awaitProcessThisPartition` 
and needs to call `closeProvider(id, Some(provider))`. 
   * Providers submitted from loadedProviders do not call 
`awaitProcessThisPartition` need to check `!verifyIfStoreInstanceActive(id)` 
before `removeFromLoadedProvidersAndClose(id)`.
   
   To do this, maybe we can pass in an enum instead of the boolean 
`alreadyRemovedFromLoadedProviders`? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -843,6 +847,9 @@ object StateStore extends Logging {
 
   private val maintenanceThreadPoolLock = new Object
 
+  private val partitionsForMaintenance =

Review Comment:
   Can you rename this? Technically all partitions are for maintenance :D Maybe 
something like `unloadedProvidersToClose`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to