anishshri-db commented on code in PR #50595: URL: https://github.com/apache/spark/pull/50595#discussion_r2067214707
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1173,6 +1207,105 @@ object StateStore extends Logging { } } + /** + * Submits maintenance work for a provider to the maintenance thread pool. + * + * @param id The StateStore provider ID to perform maintenance on + * @param provider The StateStore provider instance + */ + private def submitMaintenanceWorkForProvider( + id: StateStoreProviderId, + provider: StateStoreProvider, + storeConf: StateStoreConf, + source: MaintenanceTaskType = FromLoadedProviders): Unit = { + maintenanceThreadPool.execute(() => { + val startTime = System.currentTimeMillis() + // Determine if we can process this partition based on the source + val canProcessThisPartition = source match { + case FromTaskThread => + // Provider from task thread needs to wait for lock + val timeoutMs = storeConf.stateStoreMaintenanceProcessingTimeout * 1000 + val ableToProcessNow = awaitProcessThisPartition(id, timeoutMs) + if (!ableToProcessNow) { + // Add to queue for later processing if we can't process now + unloadedProvidersToClose.add((id, provider)) + } + ableToProcessNow + + case FromMaintenanceQueue => + // Provider from queue can be processed immediately + // (we've already removed it from loadedProviders) + true + + case FromLoadedProviders => Review Comment: +1 - lets comment this case too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org