liviazhu-db commented on code in PR #50595: URL: https://github.com/apache/spark/pull/50595#discussion_r2065151814
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1030,14 +1052,41 @@ object StateStore extends Logging { } } - /** Unload a state store provider */ - def unload(storeProviderId: StateStoreProviderId): Unit = loadedProviders.synchronized { - loadedProviders.remove(storeProviderId).foreach(_.close()) + /** + * Unload a state store provider. + * If alreadyRemovedFromLoadedProviders is None, provider will be + * removed from loadedProviders and closed. + * If alreadyRemovedFromLoadedProviders is Some, provider will be closed + * using passed in provider. + * WARNING: CAN ONLY BE CALLED FROM MAINTENANCE THREAD! + */ + def removeFromLoadedProvidersAndClose( + storeProviderId: StateStoreProviderId, + alreadyRemovedProvider: Option[StateStoreProvider] = None + ): Unit = { + // Get the provider to close - either the one passed in or one we remove from loadedProviders + val providerToClose = alreadyRemovedProvider.orElse { + loadedProviders.synchronized { + loadedProviders.remove(storeProviderId) + } + } + + // Close the provider if we found one + providerToClose.foreach(_.close()) + } + + private def closeProvider( + storeProviderId: StateStoreProviderId, + removedProvider: Option[StateStoreProvider]): Unit = { + loadedProviders.synchronized { + assert(!loadedProviders.contains(storeProviderId)) Review Comment: I don't think you can assert this here. What could happen is that the task removes the provider from loadedProviders, but by the time this code runs another task reloads a new provider (same providerId) to loadedProviders. Why do we have this fn? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1173,6 +1208,94 @@ object StateStore extends Logging { } } + /** + * Submits maintenance work for a provider to the maintenance thread pool. + * + * @param id The StateStore provider ID to perform maintenance on + * @param provider The StateStore provider instance + * @param alreadyRemovedFromLoadedProviders If true, provider was already removed from + * loadedProviders. If false, we must already + * have acquired the lock to process this partition. + */ + private def submitMaintenanceWorkForProvider( + id: StateStoreProviderId, + provider: StateStoreProvider, + storeConf: StateStoreConf, + alreadyRemovedFromLoadedProviders: Boolean = false): Unit = { + maintenanceThreadPool.execute(() => { + val startTime = System.currentTimeMillis() + val canProcessThisPartition = if (alreadyRemovedFromLoadedProviders) { Review Comment: Ah, so we actually need to differentiate providers submitted from the task thread, from the queue, and from loadedProviders. * Providers submitted from the task thread need to call `awaitProcessThisPartition` and `closeProvider(id, Some(provider))`. * Providers submitted from the queue CANNOT call `awaitProcessThisPartition` and needs to call `closeProvider(id, Some(provider))`. * Providers submitted from loadedProviders do not call `awaitProcessThisPartition` need to check `!verifyIfStoreInstanceActive(id)` before `removeFromLoadedProvidersAndClose(id)`. To do this, maybe we can pass in an enum instead of the boolean `alreadyRemovedFromLoadedProviders`? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -843,6 +847,9 @@ object StateStore extends Logging { private val maintenanceThreadPoolLock = new Object + private val partitionsForMaintenance = Review Comment: Can you rename this? Technically all partitions are for maintenance :D Maybe something like `unloadedProvidersToClose`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org