micheal-o commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2013323698


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -164,13 +289,157 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
       val storeIdsToRemove =
         instances.keys.filter(_.queryRunId == runId).toSeq
       instances --= storeIdsToRemove
+      // Also remove these instances from snapshot upload event tracking
+      stateStoreLatestUploadedSnapshot --= storeIdsToRemove
+      // Remove the corresponding run id entries for report time and starting 
time
+      lastFullSnapshotLagReportTimeMs -= runId
+      queryRunStartingPoint -= runId
       logDebug(s"Deactivating instances related to checkpoint location $runId: 
" +
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
store is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same state store but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for state store 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case LogLaggingStateStores(queryRunId, latestVersion) =>
+      val currentTimestamp = System.currentTimeMillis()
+      // Mark the query run's starting timestamp and latest version if the 
coordinator
+      // has never seen this query run before.
+      if (!queryRunStartingPoint.contains(queryRunId)) {
+        queryRunStartingPoint.put(queryRunId, QueryStartInfo(latestVersion, 
currentTimestamp))
+      } else if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, 
currentTimestamp)) {
+        // Only log lagging instances if the snapshot report upload is enabled,
+        // otherwise all instances will be considered lagging.
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
currentTimestamp)
+        if (laggingStores.nonEmpty) {
+          logWarning(
+            log"StateStoreCoordinator Snapshot Lag Report for " +
+            log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+            log"Number of state stores falling behind: " +
+            log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+          )
+          // Report all stores that are behind in snapshot uploads.
+          // Only report the list of providers lagging behind if the last 
reported time
+          // is not recent for this query run. The lag report interval denotes 
the minimum
+          // time between these full reports.
+          val timeSinceLastReport =
+            currentTimestamp - 
lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L)
+          if (timeSinceLastReport > coordinatorLagReportInterval) {
+            // Mark timestamp of the report and log the lagging instances
+            lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp)
+            // Only report the stores that are lagging the most behind in 
snapshot uploads.
+            laggingStores
+              .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, 
defaultSnapshotUploadEvent))
+              .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport)
+              .foreach { providerId =>
+                val baseLogMessage =
+                  log"StateStoreCoordinator Snapshot Lag Detected for " +
+                  log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                  log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, 
providerId.storeId)} " +
+                  log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, 
latestVersion)}"
+
+                val logMessage = 
stateStoreLatestUploadedSnapshot.get(providerId) match {
+                  case Some(snapshotEvent) =>
+                    val versionDelta = latestVersion - 
Math.max(snapshotEvent.version, 0)
+                    val timeDelta = currentTimestamp - snapshotEvent.timestamp
+
+                    baseLogMessage + log", " +
+                    log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                    log"version delta: " +
+                    log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                    log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+                  case None =>
+                    baseLogMessage + log", latest snapshot: no upload for 
query run)"
+                }
+                logWarning(logMessage)
+              }
+          }
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(providerId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion) =>
+      val currentTimestamp = System.currentTimeMillis()
+      // Only report if the corresponding run has all the necessary 
information to start reporting
+      if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, 
currentTimestamp)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
currentTimestamp)
+        logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+        context.reply(laggingStores)
+      } else {
+        context.reply(Seq.empty)
+      }
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  private def findLaggingStores(
+      queryRunId: UUID,
+      referenceVersion: Long,
+      referenceTimestamp: Long): Seq[StateStoreProviderId] = {
+    // Determine alert thresholds from configurations for both time and 
version differences.
+    val snapshotVersionDeltaMultiplier =
+      sqlConf.stateStoreCoordinatorMultiplierForMinVersionDiffToLog
+    val maintenanceIntervalMultiplier = 
sqlConf.stateStoreCoordinatorMultiplierForMinTimeDiffToLog
+    val minDeltasForSnapshot = sqlConf.stateStoreMinDeltasForSnapshot
+    val maintenanceInterval = sqlConf.streamingMaintenanceInterval
+
+    // Use the configured multipliers to determine the proper alert thresholds
+    val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot
+    val minTimeDeltaForLogging = maintenanceIntervalMultiplier * 
maintenanceInterval
+
+    // Look for active state store providers that are lagging behind in 
snapshot uploads
+    instances.keys.filter { storeProviderId =>
+      // Only consider providers that are part of this specific query run
+      val latestSnapshot = stateStoreLatestUploadedSnapshot.getOrElse(
+        storeProviderId,
+        defaultSnapshotUploadEvent
+      )
+      storeProviderId.queryRunId == queryRunId && (

Review Comment:
   why not lazily filter the keys with runId (e.g. `.view.filter`) instead of 
doing it here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -283,6 +290,13 @@ abstract class ProgressContext(
     progressReporter.lastNoExecutionProgressEventTime = 
triggerClock.getTimeMillis()
     progressReporter.updateProgress(newProgress)
 
+    // Ask the state store coordinator to log all lagging state stores
+    if (progressReporter.coordinatorReportSnapshotUploadLag) {

Review Comment:
   nit: should this check to see if it is enabled be done in the 
`logLaggingStateStores` func?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala:
##########
@@ -97,6 +97,12 @@ class StateStoreConf(
   val enableStateStoreCheckpointIds =
     StatefulOperatorStateInfo.enableStateStoreCheckpointIds(sqlConf)
 
+  /**
+   * Whether the coordinator is reporting state stores trailing behind in 
snapshot uploads.
+   */
+  val stateStoreCoordinatorReportSnapshotUploadLag: Boolean =

Review Comment:
   nit: you can do without this prefix `stateStoreCoordinator` for the val. 
Same for the RocksDBConf



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -164,13 +289,157 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
       val storeIdsToRemove =
         instances.keys.filter(_.queryRunId == runId).toSeq
       instances --= storeIdsToRemove
+      // Also remove these instances from snapshot upload event tracking
+      stateStoreLatestUploadedSnapshot --= storeIdsToRemove
+      // Remove the corresponding run id entries for report time and starting 
time
+      lastFullSnapshotLagReportTimeMs -= runId
+      queryRunStartingPoint -= runId
       logDebug(s"Deactivating instances related to checkpoint location $runId: 
" +
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
store is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same state store but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for state store 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case LogLaggingStateStores(queryRunId, latestVersion) =>
+      val currentTimestamp = System.currentTimeMillis()
+      // Mark the query run's starting timestamp and latest version if the 
coordinator
+      // has never seen this query run before.
+      if (!queryRunStartingPoint.contains(queryRunId)) {
+        queryRunStartingPoint.put(queryRunId, QueryStartInfo(latestVersion, 
currentTimestamp))
+      } else if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, 
currentTimestamp)) {
+        // Only log lagging instances if the snapshot report upload is enabled,
+        // otherwise all instances will be considered lagging.
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
currentTimestamp)
+        if (laggingStores.nonEmpty) {
+          logWarning(
+            log"StateStoreCoordinator Snapshot Lag Report for " +
+            log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+            log"Number of state stores falling behind: " +
+            log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+          )
+          // Report all stores that are behind in snapshot uploads.
+          // Only report the list of providers lagging behind if the last 
reported time
+          // is not recent for this query run. The lag report interval denotes 
the minimum
+          // time between these full reports.
+          val timeSinceLastReport =
+            currentTimestamp - 
lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L)
+          if (timeSinceLastReport > coordinatorLagReportInterval) {
+            // Mark timestamp of the report and log the lagging instances
+            lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp)
+            // Only report the stores that are lagging the most behind in 
snapshot uploads.
+            laggingStores
+              .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, 
defaultSnapshotUploadEvent))
+              .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport)
+              .foreach { providerId =>
+                val baseLogMessage =
+                  log"StateStoreCoordinator Snapshot Lag Detected for " +
+                  log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                  log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, 
providerId.storeId)} " +
+                  log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, 
latestVersion)}"
+
+                val logMessage = 
stateStoreLatestUploadedSnapshot.get(providerId) match {
+                  case Some(snapshotEvent) =>
+                    val versionDelta = latestVersion - 
Math.max(snapshotEvent.version, 0)

Review Comment:
   why do we need this max? since we are no longer setting the default values 
in map right?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -129,10 +198,66 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
  * Class for coordinating instances of [[StateStore]]s loaded in executors 
across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
-    extends ThreadSafeRpcEndpoint with Logging {
+private class StateStoreCoordinator(
+    override val rpcEnv: RpcEnv,
+    val sqlConf: SQLConf)
+  extends ThreadSafeRpcEndpoint with Logging {
   private val instances = new mutable.HashMap[StateStoreProviderId, 
ExecutorCacheTaskLocation]
 
+  // Stores the latest snapshot upload event for a specific state store
+  private val stateStoreLatestUploadedSnapshot =
+    new mutable.HashMap[StateStoreProviderId, SnapshotUploadEvent]
+
+  // Default snapshot upload event to use when a provider has never uploaded a 
snapshot
+  private val defaultSnapshotUploadEvent = SnapshotUploadEvent(-1, 0)
+
+  // Stores the last timestamp in milliseconds for each queryRunId indicating 
when the
+  // coordinator did a report on instances lagging behind on snapshot uploads.
+  // The initial timestamp is defaulted to 0 milliseconds.
+  private val lastFullSnapshotLagReportTimeMs = new mutable.HashMap[UUID, Long]
+
+  // Stores the time and latest version of the query run's start.

Review Comment:
   nit: you mean starting version not latest version?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -164,13 +289,157 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
       val storeIdsToRemove =
         instances.keys.filter(_.queryRunId == runId).toSeq
       instances --= storeIdsToRemove
+      // Also remove these instances from snapshot upload event tracking
+      stateStoreLatestUploadedSnapshot --= storeIdsToRemove
+      // Remove the corresponding run id entries for report time and starting 
time
+      lastFullSnapshotLagReportTimeMs -= runId
+      queryRunStartingPoint -= runId
       logDebug(s"Deactivating instances related to checkpoint location $runId: 
" +
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
store is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same state store but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for state store 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case LogLaggingStateStores(queryRunId, latestVersion) =>
+      val currentTimestamp = System.currentTimeMillis()
+      // Mark the query run's starting timestamp and latest version if the 
coordinator
+      // has never seen this query run before.
+      if (!queryRunStartingPoint.contains(queryRunId)) {
+        queryRunStartingPoint.put(queryRunId, QueryStartInfo(latestVersion, 
currentTimestamp))
+      } else if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, 
currentTimestamp)) {
+        // Only log lagging instances if the snapshot report upload is enabled,
+        // otherwise all instances will be considered lagging.
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
currentTimestamp)
+        if (laggingStores.nonEmpty) {
+          logWarning(
+            log"StateStoreCoordinator Snapshot Lag Report for " +
+            log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+            log"Number of state stores falling behind: " +
+            log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+          )
+          // Report all stores that are behind in snapshot uploads.
+          // Only report the list of providers lagging behind if the last 
reported time
+          // is not recent for this query run. The lag report interval denotes 
the minimum
+          // time between these full reports.
+          val timeSinceLastReport =
+            currentTimestamp - 
lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L)
+          if (timeSinceLastReport > coordinatorLagReportInterval) {
+            // Mark timestamp of the report and log the lagging instances
+            lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp)
+            // Only report the stores that are lagging the most behind in 
snapshot uploads.
+            laggingStores
+              .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, 
defaultSnapshotUploadEvent))
+              .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport)
+              .foreach { providerId =>
+                val baseLogMessage =
+                  log"StateStoreCoordinator Snapshot Lag Detected for " +
+                  log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                  log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, 
providerId.storeId)} " +
+                  log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, 
latestVersion)}"
+
+                val logMessage = 
stateStoreLatestUploadedSnapshot.get(providerId) match {
+                  case Some(snapshotEvent) =>
+                    val versionDelta = latestVersion - 
Math.max(snapshotEvent.version, 0)
+                    val timeDelta = currentTimestamp - snapshotEvent.timestamp
+
+                    baseLogMessage + log", " +
+                    log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                    log"version delta: " +
+                    log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                    log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+                  case None =>
+                    baseLogMessage + log", latest snapshot: no upload for 
query run)"
+                }
+                logWarning(logMessage)
+              }
+          }
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(providerId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion) =>
+      val currentTimestamp = System.currentTimeMillis()
+      // Only report if the corresponding run has all the necessary 
information to start reporting
+      if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, 
currentTimestamp)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
currentTimestamp)
+        logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+        context.reply(laggingStores)
+      } else {
+        context.reply(Seq.empty)
+      }
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  private def findLaggingStores(
+      queryRunId: UUID,
+      referenceVersion: Long,
+      referenceTimestamp: Long): Seq[StateStoreProviderId] = {
+    // Determine alert thresholds from configurations for both time and 
version differences.
+    val snapshotVersionDeltaMultiplier =
+      sqlConf.stateStoreCoordinatorMultiplierForMinVersionDiffToLog
+    val maintenanceIntervalMultiplier = 
sqlConf.stateStoreCoordinatorMultiplierForMinTimeDiffToLog
+    val minDeltasForSnapshot = sqlConf.stateStoreMinDeltasForSnapshot
+    val maintenanceInterval = sqlConf.streamingMaintenanceInterval
+
+    // Use the configured multipliers to determine the proper alert thresholds
+    val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot
+    val minTimeDeltaForLogging = maintenanceIntervalMultiplier * 
maintenanceInterval
+
+    // Look for active state store providers that are lagging behind in 
snapshot uploads
+    instances.keys.filter { storeProviderId =>
+      // Only consider providers that are part of this specific query run
+      val latestSnapshot = stateStoreLatestUploadedSnapshot.getOrElse(
+        storeProviderId,
+        defaultSnapshotUploadEvent
+      )
+      storeProviderId.queryRunId == queryRunId && (
+        // Mark a state store as lagging if it's behind in both version and 
time.
+        // A state store is considered lagging if it's behind in both version 
and time according
+        // to the configured thresholds.
+        // Stores that didn't upload a snapshot will be treated as a store 
with a snapshot of
+        // version 0.
+        referenceVersion - Math.max(latestSnapshot.version, 0) > 
minVersionDeltaForLogging &&

Review Comment:
   BTW, you can make the version for the default 0, instead of -1. You won't 
need this then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to