anishshri-db commented on code in PR #50595: URL: https://github.com/apache/spark/pull/50595#discussion_r2067256561
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1111,60 +1159,57 @@ object StateStore extends Logging { } } + // Wait until this partition can be processed + private def awaitProcessThisPartition( + id: StateStoreProviderId, + timeoutMs: Long + ): Boolean = maintenanceThreadPoolLock synchronized { + val endTime = System.currentTimeMillis() + timeoutMs + + // If immediate processing fails, wait with timeout + var canProcessThisPartition = processThisPartition(id) + while (!canProcessThisPartition && System.currentTimeMillis() < endTime) { + canProcessThisPartition = processThisPartition(id) + maintenanceThreadPoolLock.wait(timeoutMs) + } + + canProcessThisPartition + } + + private def doMaintenance(): Unit = doMaintenance(StateStoreConf.empty) + /** * Execute background maintenance task in all the loaded store providers if they are still * the active instances according to the coordinator. */ - private def doMaintenance(): Unit = { + private def doMaintenance(storeConf: StateStoreConf): Unit = { logDebug("Doing maintenance") if (SparkEnv.get == null) { throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores") } + + // Providers that couldn't be processed now and need to be added back to the queue + val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]() + + while (!unloadedProvidersToClose.isEmpty) { Review Comment: Access to `unloadedProvidersToClose` is not truly thread safe here. Can we add a separate lock to guard access ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org