zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2005250545


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp

Review Comment:
   Switched to use current timestamp to make it less convoluted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to