anishshri-db commented on code in PR #50595: URL: https://github.com/apache/spark/pull/50595#discussion_r2067405164
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1111,60 +1162,73 @@ object StateStore extends Logging { } } + // Wait until this partition can be processed + private def awaitProcessThisPartition( + id: StateStoreProviderId, + timeoutMs: Long): Boolean = maintenanceThreadPoolLock synchronized { + val startTime = System.currentTimeMillis() + val endTime = startTime + timeoutMs + + // If immediate processing fails, wait with timeout + var canProcessThisPartition = processThisPartition(id) + while (!canProcessThisPartition && System.currentTimeMillis() < endTime) { + canProcessThisPartition = processThisPartition(id) + maintenanceThreadPoolLock.wait(timeoutMs) + } + val elapsedTime = System.currentTimeMillis() - startTime + logInfo(log"Waited for ${MDC(LogKeys.TOTAL_TIME, elapsedTime)} ms to be able to process " + + log"maintenance for partition ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}") + canProcessThisPartition + } + + private def doMaintenance(): Unit = doMaintenance(StateStoreConf.empty) + /** * Execute background maintenance task in all the loaded store providers if they are still * the active instances according to the coordinator. */ - private def doMaintenance(): Unit = { + private def doMaintenance(storeConf: StateStoreConf): Unit = { logDebug("Doing maintenance") if (SparkEnv.get == null) { throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores") } + + // Providers that couldn't be processed now and need to be added back to the queue + val providersToRequeue = ArrayBuffer.empty[(StateStoreProviderId, StateStoreProvider)] + + // Create a temporary list and drain the concurrent queue into it under a lock + val tempList = providersQueueLock synchronized { + val items = ArrayBuffer.empty[(StateStoreProviderId, StateStoreProvider)] + while (!unloadedProvidersToClose.isEmpty) { Review Comment: Can we just use `toArray` directly ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org