zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2012620201


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +247,149 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(storeId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
store is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same state store but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for state store 
$storeId")
+      if (!stateStoreLatestUploadedSnapshot.get(storeId).exists(_.version >= 
version)) {
+        stateStoreLatestUploadedSnapshot.put(storeId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case LogLaggingStateStores(queryRunId, latestVersion) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      val currentTimestamp = System.currentTimeMillis()
+      val laggingStores = findLaggingStores(queryRunId, latestVersion, 
currentTimestamp)
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReportTimeMs > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReportTimeMs = currentTimestamp
+          // Only report the stores that are lagging the most behind in 
snapshot uploads.
+          laggingStores
+            .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, 
defaultSnapshotUploadEvent))
+            
.take(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT))
+            .foreach { storeId =>
+              val logMessage = stateStoreLatestUploadedSnapshot.get(storeId) 
match {
+                case Some(snapshotEvent) =>
+                  val versionDelta = latestVersion - 
Math.max(snapshotEvent.version, 0)
+                  val timeDelta = currentTimestamp - snapshotEvent.timestamp
+
+                  log"StateStoreCoordinator Snapshot Lag Detected for " +
+                  log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                  log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, storeId)} " +
+                  log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, 
latestVersion)}, " +
+                  log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                  log"version delta: " +
+                  log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                  log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+                case None =>
+                  log"StateStoreCoordinator Snapshot Lag Detected for " +
+                  log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                  log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, storeId)} " +
+                  log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, 
latestVersion)}, " +
+                  log"latest snapshot: no upload for query run)"
+              }
+              logWarning(logMessage)
+            }
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(storeId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(storeId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $storeId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion) =>
+      val currentTimestamp = System.currentTimeMillis()
+      val laggingStores = findLaggingStores(queryRunId, latestVersion, 
currentTimestamp)
+      logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+
+    def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = {
+      // Use version 0 for stores that have not uploaded a snapshot version 
for this run.
+      val versionDelta = latestVersion - Math.max(version, 0)
+      val timeDelta = latestTimestamp - timestamp
+
+      // Determine alert thresholds from configurations for both time and 
version differences.
+      val snapshotVersionDeltaMultiplier = sqlConf.getConf(
+        SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG)
+      val maintenanceIntervalMultiplier = sqlConf.getConf(
+        SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_TIME_DIFF_TO_LOG)
+      val minDeltasForSnapshot = 
sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+      val maintenanceInterval = 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)
+
+      // Use the configured multipliers to determine the proper alert 
thresholds
+      val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot
+      val minTimeDeltaForLogging = maintenanceIntervalMultiplier * 
maintenanceInterval
+
+      // Mark a state store as lagging if it is behind in both version and 
time.
+      // For stores that have never uploaded a snapshot, the time requirement 
will
+      // be automatically satisfied as the initial timestamp is 0.
+      versionDelta > minVersionDeltaForLogging && timeDelta > 
minTimeDeltaForLogging
+    }
+
+    override def compare(otherEvent: SnapshotUploadEvent): Int = {
+      // Compare by version first, then by timestamp as tiebreaker
+      val versionCompare = this.version.compare(otherEvent.version)
+      if (versionCompare == 0) {
+        this.timestamp.compare(otherEvent.timestamp)
+      } else {
+        versionCompare
+      }
+    }
+
+    override def toString(): String = {
+      s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)"
+    }
+  }
+
+  private def findLaggingStores(
+      queryRunId: UUID,
+      referenceVersion: Long,
+      referenceTimestamp: Long): Seq[StateStoreId] = {
+    // Do not report any instance as lagging if report snapshot upload is 
disabled.
+    if 
(!sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG)) {
+      return Seq.empty
+    }
+    // Look for state stores that are lagging behind in snapshot uploads
+    instances.keys
+      .filter { storeProviderId =>
+        // Only consider active providers that are part of this specific query 
run,
+        // but look through all state stores under this store ID, as it's 
possible that
+        // the same query re-runs with a new run ID but has already uploaded 
some snapshots.

Review Comment:
   Cleared the rest of the confusion offline, coordinator will not have its 
previous information between query runs and this has been addressed now. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to