liviazhu-db commented on code in PR #50595: URL: https://github.com/apache/spark/pull/50595#discussion_r2059228277
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1157,6 +1159,83 @@ object StateStore extends Logging { } } + /** + * Submits maintenance work for a provider to the maintenance thread pool. + * + * @param id The StateStore provider ID to perform maintenance on + * @param provider The StateStore provider instance + * @param alreadyRemovedFromLoadedProviders If true, provider was already removed from + * loadedProviders. If false, we must already + * have acquired the lock to process this partition. + */ + private def submitMaintenanceWorkForProvider( + id: StateStoreProviderId, + provider: StateStoreProvider, + alreadyRemovedFromLoadedProviders: Boolean = false): Unit = { + maintenanceThreadPool.execute(() => { + val startTime = System.currentTimeMillis() + if (alreadyRemovedFromLoadedProviders) { + // If provider is already removed from loadedProviders (which can happen when a task thread + // triggers unloading of an old provider), we MUST process this partition to + // close it properly. + // We block here until we can acquire the lock for this partition, waiting for any + // possible ongoing maintenance on this partition to complete first. + awaitProcessThisPartition(id) + } + val awaitingPartitionDuration = System.currentTimeMillis() - startTime + try { + provider.doMaintenance() + // If shouldRemoveFromLoadedProviders is false, we don't need to verify Review Comment: Oops this comment needs to be updated ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1157,6 +1159,83 @@ object StateStore extends Logging { } } + /** + * Submits maintenance work for a provider to the maintenance thread pool. + * + * @param id The StateStore provider ID to perform maintenance on + * @param provider The StateStore provider instance + * @param alreadyRemovedFromLoadedProviders If true, provider was already removed from + * loadedProviders. If false, we must already + * have acquired the lock to process this partition. + */ + private def submitMaintenanceWorkForProvider( + id: StateStoreProviderId, + provider: StateStoreProvider, + alreadyRemovedFromLoadedProviders: Boolean = false): Unit = { + maintenanceThreadPool.execute(() => { + val startTime = System.currentTimeMillis() + if (alreadyRemovedFromLoadedProviders) { + // If provider is already removed from loadedProviders (which can happen when a task thread + // triggers unloading of an old provider), we MUST process this partition to + // close it properly. + // We block here until we can acquire the lock for this partition, waiting for any + // possible ongoing maintenance on this partition to complete first. + awaitProcessThisPartition(id) + } + val awaitingPartitionDuration = System.currentTimeMillis() - startTime + try { + provider.doMaintenance() + // If shouldRemoveFromLoadedProviders is false, we don't need to verify + // with the coordinator as we know it definitely should be unloaded. + if (alreadyRemovedFromLoadedProviders || !verifyIfStoreInstanceActive(id)) { Review Comment: I think the way this `if` block is structured makes it kind of hard to follow the logic. Instead can we do something like ``` if (alreadyRemovedFromLoadedProviders) { unload(id, Some(provider) } else if (!verifyIfStoreInstanceActive(id)) { unload(id) } ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1157,6 +1158,83 @@ object StateStore extends Logging { } } + /** + * Submits maintenance work for a provider to the maintenance thread pool. + * + * @param id The StateStore provider ID to perform maintenance on + * @param provider The StateStore provider instance + * @param alreadyRemovedFromLoadedProviders If true, provider was already removed from + * loadedProviders. If false, we must already + * have acquired the lock to process this partition. + */ + private def submitMaintenanceWorkForProvider( + id: StateStoreProviderId, + provider: StateStoreProvider, + alreadyRemovedFromLoadedProviders: Boolean = false): Unit = { Review Comment: I wonder if we can rename alreadyRemovedFromLoadedProviders, since it encompasses both the fact that it's already removed and some other stuff... maybe we can just rename it to `submittedFromTaskThread` or something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org