zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r1979750719


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +204,85 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(s"Number of state stores falling behind: 
${laggingStores.size}")
+        laggingStores.foreach { storeProviderId =>
+          val snapshotEvent =
+            stateStoreSnapshotVersions.getOrElse(storeProviderId, 
SnapshotUploadEvent(-1, 0))
+          logWarning(
+            s"State store falling behind $storeProviderId " +
+            s"(current: $snapshotEvent, latest: $latestSnapshot)"

Review Comment:
   Good idea, added `StateStoreCoordinator Snapshot Lag` as a prefix for now. 
Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to