zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r1992490417


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +

Review Comment:
   I may be wrong, but unless if changelog checkpointing can turn off during a 
query this situation shouldn't occur.
   
   All instances should report a version of -1 if upload reports are disabled, 
and without upload reports being sent the coordinator won't be looking out for 
lagging stores either.
   
   I have a test for this 
[here](https://github.com/apache/spark/pull/50123/files#diff-7c577967a171f51523afddd7a5eca49806432fae2da8dc35c114d9699e6e3e40R160-R202),
 but if there's a specific scenario that could trigger this lagging report then 
I can add it 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to