liviazhu-db commented on code in PR #50595:
URL: https://github.com/apache/spark/pull/50595#discussion_r2055081591


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1009,14 +1013,45 @@ object StateStore extends Logging {
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
-      providerIdsToUnload.foreach(unload(_))
+      providerIdsToUnload.foreach(id => {
+        loadedProviders.remove(id).foreach( provider => {
+          // Trigger maintenance thread to immediately do maintenance on and 
close the provider.
+          // Doing maintenance first allows us to do maintenance for a 
constantly-moving state
+          // store.
+          logInfo(log"Task thread trigger maintenance on " +

Review Comment:
   Can you add some more info in this log line, like task ID? Also add that 
provider was removed from loadedProviders.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1009,14 +1013,45 @@ object StateStore extends Logging {
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, 
otherProviderIds)
-      providerIdsToUnload.foreach(unload(_))
+      providerIdsToUnload.foreach(id => {
+        loadedProviders.remove(id).foreach( provider => {

Review Comment:
   Already in the synchronized block on line 922.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1157,6 +1160,69 @@ object StateStore extends Logging {
     }
   }
 
+  private def doMaintenanceOnProvider(id: StateStoreProviderId, provider: 
StateStoreProvider,

Review Comment:
   Can you add a comment saying that if alreadyRemovedFromLoadedProviders is 
false, we must already have the lock to process this partition? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to