micheal-o commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2004839454


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2256,6 +2256,59 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val 
STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG =
+    
buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog")

Review Comment:
   nit: the conf name is a bit confusing, even the doc message can be simplified



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2256,6 +2256,59 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val 
STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG =
+    
buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog")
+      .internal()
+      .doc(
+        "This multiplier determines the minimum version threshold for logging 
warnings when a " +
+        "state store instance falls behind. The coordinator logs a warning if 
a state store's " +
+        "last uploaded snapshot's version lags behind the query's latest known 
version by " +
+        "this threshold. The threshold is calculated as the configured minimum 
number of deltas " +
+        "needed to create a snapshot, multiplied by this multiplier."
+      )
+      .version("4.1.0")
+      .intConf
+      .checkValue(k => k >= 1, "Must be greater than or equal to 1")
+      .createWithDefault(5)
+
+  val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG 
=
+    
buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog")
+      .internal()
+      .doc(
+        "This multiplier determines the minimum time threshold for logging 
warnings when a " +
+        "state store instance falls behind. The coordinator logs a warning if 
a state store's " +
+        "last snapshot upload time lags behind the current time by this 
threshold. " +
+        "The threshold is calculated as the maintenance interval multiplied by 
this multiplier."
+      )
+      .version("4.1.0")
+      .intConf
+      .checkValue(k => k >= 1, "Must be greater than or equal to 1")
+      .createWithDefault(10)
+
+  val STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED =
+    buildConf("spark.sql.streaming.stateStore.coordinatorReportUpload.enabled")

Review Comment:
   nit: `coordinatorReportSnapshotUploadLag` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -966,3 +969,40 @@ class RocksDBStateStoreChangeDataReader(
     }
   }
 }
+
+/**
+ * Object used to relay events reported from a RocksDB instance to the state 
store coordinator.

Review Comment:
   nit: use `Class` instead of `Object` to avoid confusion with scala object 
type



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2256,6 +2256,59 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val 
STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG =
+    
buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog")
+      .internal()
+      .doc(
+        "This multiplier determines the minimum version threshold for logging 
warnings when a " +
+        "state store instance falls behind. The coordinator logs a warning if 
a state store's " +
+        "last uploaded snapshot's version lags behind the query's latest known 
version by " +
+        "this threshold. The threshold is calculated as the configured minimum 
number of deltas " +
+        "needed to create a snapshot, multiplied by this multiplier."
+      )
+      .version("4.1.0")
+      .intConf
+      .checkValue(k => k >= 1, "Must be greater than or equal to 1")
+      .createWithDefault(5)
+
+  val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG 
=
+    
buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog")

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -966,3 +969,40 @@ class RocksDBStateStoreChangeDataReader(
     }
   }
 }
+
+/**
+ * Object used to relay events reported from a RocksDB instance to the state 
store coordinator.
+ *
+ * We pass this into the RocksDB instance to report specific events like 
snapshot uploads.
+ * This should only be used to report back to the coordinator for metrics and 
monitoring purposes.
+ */
+private[state] case class RocksDBEventListener(
+    queryRunId: String,
+    stateStoreId: StateStoreId,
+    storeConf: StateStoreConf) {
+
+  /** ID of the state store provider managing the RocksDB instance */
+  private val stateStoreProviderId: StateStoreProviderId =
+    StateStoreProviderId(stateStoreId, UUID.fromString(queryRunId))
+
+  /** Whether the event listener should relay these messages to the state 
store coordinator */
+  private val coordinatorReportUploadEnabled: Boolean =
+    storeConf.stateStoreCoordinatorReportUploadEnabled
+
+  /**
+   * Callback function from RocksDB to report events to the coordinator.
+   * Additional information such as the state store ID and the query run ID are
+   * attached here to report back to the coordinator.
+   *
+   * @param version The snapshot version that was just uploaded from RocksDB
+   */
+  def reportSnapshotUploaded(version: Long): Unit = {
+    // Only report to the coordinator if this is enabled, as sometimes we do 
not need
+    // to track for lagging instances.
+    // Also ignore message if we are missing the provider ID from lack of 
initialization.

Review Comment:
   i don't see code for what this comment says



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -129,10 +210,25 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
  * Class for coordinating instances of [[StateStore]]s loaded in executors 
across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
-    extends ThreadSafeRpcEndpoint with Logging {
+private class StateStoreCoordinator(
+    override val rpcEnv: RpcEnv,
+    val sqlConf: SQLConf)
+  extends ThreadSafeRpcEndpoint
+  with Logging {
   private val instances = new mutable.HashMap[StateStoreProviderId, 
ExecutorCacheTaskLocation]
 
+  // Stores the latest snapshot upload event for a specific state store 
provider instance
+  private val stateStoreLatestUploadedSnapshot =
+    new mutable.HashMap[StateStoreProviderId, SnapshotUploadEvent]
+
+  // Default snapshot upload event to use when a provider has never uploaded a 
snapshot
+  private val defaultSnapshotUploadEvent = SnapshotUploadEvent(-1, 0)
+
+  // Stores the last timestamp in milliseconds where the coordinator did a 
full report on
+  // instances lagging behind on snapshot uploads. The initial timestamp is 
defaulted to
+  // 0 milliseconds.
+  private var lastFullSnapshotLagReport = 0L

Review Comment:
   nit: `lastFullSnapshotLagReportTimeMs` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp
+
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+              case None =>
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: never uploaded)"
+            }
+            logWarning(logMessage)
+          }
+        } else if (laggingStores.nonEmpty) {
+          logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full 
report was too recent")
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(providerId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) =>
+      val laggingStores = findLaggingStores(queryRunId, latestVersion, 
timestamp)
+      logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+
+    def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = {
+      val versionDelta = latestVersion - version
+      val timeDelta = latestTimestamp - timestamp
+
+      // Determine alert thresholds from configurations for both time and 
version differences.
+      val snapshotVersionDeltaMultiplier = sqlConf.getConf(
+        
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG)
+      val maintenanceIntervalMultiplier = sqlConf.getConf(
+        
SQLConf.STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG)
+      val minDeltasForSnapshot = 
sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+      val maintenanceInterval = 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)
+
+      // Use the configured multipliers to determine the proper alert 
thresholds
+      val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot
+      val minTimeDeltaForLogging = maintenanceIntervalMultiplier * 
maintenanceInterval
+
+      // Mark a state store as lagging if it is behind in both version and 
time.
+      // In the case that a snapshot was never uploaded, we treat version -1 
as the preceding
+      // version of 0, and only rely on the version delta condition.
+      // Time requirement will be automatically satisfied as the initial 
timestamp is 0.

Review Comment:
   This condition only makes sense for normal case.
   
   But for a new run this will make a false noise. e.g. the query previously 
ran up to version 50 and uploaded snapshot. And now a new run is started, this 
code will think a snapshot has never been uploaded and wrongly mark it as 
lagging. We need to handle that otherwise every new run of a query will say all 
its state stores are lagging which is wrong. And will cause noise in 
logs/dashboard/alert.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2256,6 +2256,59 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val 
STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG =
+    
buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog")
+      .internal()
+      .doc(
+        "This multiplier determines the minimum version threshold for logging 
warnings when a " +
+        "state store instance falls behind. The coordinator logs a warning if 
a state store's " +
+        "last uploaded snapshot's version lags behind the query's latest known 
version by " +
+        "this threshold. The threshold is calculated as the configured minimum 
number of deltas " +
+        "needed to create a snapshot, multiplied by this multiplier."
+      )
+      .version("4.1.0")
+      .intConf
+      .checkValue(k => k >= 1, "Must be greater than or equal to 1")
+      .createWithDefault(5)
+
+  val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG 
=
+    
buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog")
+      .internal()
+      .doc(
+        "This multiplier determines the minimum time threshold for logging 
warnings when a " +
+        "state store instance falls behind. The coordinator logs a warning if 
a state store's " +
+        "last snapshot upload time lags behind the current time by this 
threshold. " +
+        "The threshold is calculated as the maintenance interval multiplied by 
this multiplier."
+      )
+      .version("4.1.0")
+      .intConf
+      .checkValue(k => k >= 1, "Must be greater than or equal to 1")
+      .createWithDefault(10)
+
+  val STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED =
+    buildConf("spark.sql.streaming.stateStore.coordinatorReportUpload.enabled")
+      .internal()
+      .doc(
+        "If enabled, state store instances will send a message to the state 
store " +

Review Comment:
   nit: this doc is not really explaining the functionality but impl details. 
e.g.
   
   `When enabled, the state store coordinator will report state stores whose 
snapshot haven't been uploaded for some time. See other conf xyz that 
controls...`. Doesn't have to be my exact wording



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp
+
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+              case None =>
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: never uploaded)"
+            }
+            logWarning(logMessage)
+          }
+        } else if (laggingStores.nonEmpty) {
+          logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full 
report was too recent")
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(providerId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) =>
+      val laggingStores = findLaggingStores(queryRunId, latestVersion, 
timestamp)
+      logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+
+    def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = {
+      val versionDelta = latestVersion - version
+      val timeDelta = latestTimestamp - timestamp
+
+      // Determine alert thresholds from configurations for both time and 
version differences.
+      val snapshotVersionDeltaMultiplier = sqlConf.getConf(
+        
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG)
+      val maintenanceIntervalMultiplier = sqlConf.getConf(
+        
SQLConf.STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG)
+      val minDeltasForSnapshot = 
sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+      val maintenanceInterval = 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)
+
+      // Use the configured multipliers to determine the proper alert 
thresholds
+      val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot
+      val minTimeDeltaForLogging = maintenanceIntervalMultiplier * 
maintenanceInterval
+
+      // Mark a state store as lagging if it is behind in both version and 
time.
+      // In the case that a snapshot was never uploaded, we treat version -1 
as the preceding
+      // version of 0, and only rely on the version delta condition.
+      // Time requirement will be automatically satisfied as the initial 
timestamp is 0.
+      versionDelta >= minVersionDeltaForLogging && timeDelta > 
minTimeDeltaForLogging
+    }
+
+    override def compare(otherEvent: SnapshotUploadEvent): Int = {
+      // Compare by version first, then by timestamp as tiebreaker
+      val versionCompare = this.version.compare(otherEvent.version)
+      if (versionCompare == 0) {
+        this.timestamp.compare(otherEvent.timestamp)
+      } else {
+        versionCompare
+      }
+    }
+
+    override def toString(): String = {
+      s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)"
+    }
+  }
+
+  private def findLaggingStores(
+      queryRunId: UUID,
+      referenceVersion: Long,
+      referenceTimestamp: Long): Seq[StateStoreProviderId] = {
+    // Do not report any instance as lagging if the snapshot report upload is 
disabled,
+    // since it will treat all active instances as stores that have never 
uploaded.
+    if 
(!sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {

Review Comment:
   why check this conf again? it is already checked before calling this func 
right? And this is a private func that is only called from one place in 
coordinator



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -55,6 +56,45 @@ private case class GetLocation(storeId: StateStoreProviderId)
 private case class DeactivateInstances(runId: UUID)
   extends StateStoreCoordinatorMessage
 
+/**
+ * This message is used to report a state store instance has just finished 
uploading a snapshot,
+ * along with the timestamp in milliseconds and the snapshot version.
+ */
+private case class ReportSnapshotUploaded(
+    storeId: StateStoreProviderId,
+    version: Long,
+    timestamp: Long)
+  extends StateStoreCoordinatorMessage
+
+/**
+ * This message is used for the coordinator to look for all state stores that 
are lagging behind
+ * in snapshot uploads. The coordinator will then log a warning message for 
each lagging instance.
+ */
+private case class ConstructLaggingInstanceReport(

Review Comment:
   nit: `LogLaggingStateStores` ? 
   
   "Instance" is not applicable here since it means store instances i.e. a 
single store can have multiple instances across executors



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2256,6 +2256,59 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val 
STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG =
+    
buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog")
+      .internal()
+      .doc(
+        "This multiplier determines the minimum version threshold for logging 
warnings when a " +
+        "state store instance falls behind. The coordinator logs a warning if 
a state store's " +
+        "last uploaded snapshot's version lags behind the query's latest known 
version by " +
+        "this threshold. The threshold is calculated as the configured minimum 
number of deltas " +
+        "needed to create a snapshot, multiplied by this multiplier."
+      )
+      .version("4.1.0")
+      .intConf
+      .checkValue(k => k >= 1, "Must be greater than or equal to 1")
+      .createWithDefault(5)
+
+  val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG 
=
+    
buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog")
+      .internal()
+      .doc(
+        "This multiplier determines the minimum time threshold for logging 
warnings when a " +
+        "state store instance falls behind. The coordinator logs a warning if 
a state store's " +
+        "last snapshot upload time lags behind the current time by this 
threshold. " +
+        "The threshold is calculated as the maintenance interval multiplied by 
this multiplier."
+      )
+      .version("4.1.0")
+      .intConf
+      .checkValue(k => k >= 1, "Must be greater than or equal to 1")
+      .createWithDefault(10)
+
+  val STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED =
+    buildConf("spark.sql.streaming.stateStore.coordinatorReportUpload.enabled")
+      .internal()
+      .doc(
+        "If enabled, state store instances will send a message to the state 
store " +
+        "coordinator whenever they complete a snapshot upload."
+      )
+      .version("4.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL =
+    buildConf("spark.sql.streaming.stateStore.snapshotLagReportInterval")
+      .internal()
+      .doc(
+        "The minimum amount of time between the state store coordinator's full 
report on " +

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -283,6 +287,14 @@ abstract class ProgressContext(
     progressReporter.lastNoExecutionProgressEventTime = 
triggerClock.getTimeMillis()
     progressReporter.updateProgress(newProgress)
 
+    // Ask the state store coordinator to look for any lagging instances and 
report them.
+    progressReporter.stateStoreCoordinator
+      .constructLaggingInstanceReport(
+        lastExecution.runId,
+        lastEpochId,

Review Comment:
   fyi this is a bug. lastEpochId (aka batchId) != latest store version. latest 
store version = batchId + 1. You can confirm. Because when batch x commits, it 
produces store version x + 1.
   
   Your test is not catching this because you're doing >= 0 verification in 
your test. Also because you don't have a test for when changelog is disabled.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -38,6 +38,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{NonFateSharingCache, Utils}
 
+

Review Comment:
   nit: why?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1472,6 +1475,8 @@ class RocksDB(
         log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
       // Compare and update with the version that was just uploaded.
       lastUploadedSnapshotVersion.updateAndGet(v => Math.max(snapshot.version, 
v))
+      // Report snapshot upload event to the coordinator.
+      eventListener.foreach(_.reportSnapshotUploaded(snapshot.version))

Review Comment:
   So we are reporting this even if changelog checkpointing is not being used? 
Is there a need for this if we are uploading snapshot for every batch (i.e. no 
changelog)?
   
   Or Is it just for simplicity of the coordinator side logic for detection, 
since this is only specific to rocksdb? Just asking. I'm fine with just adding 
comment that it can be improved, if we are doing this for now, just to not 
overly complicate things



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -55,6 +56,45 @@ private case class GetLocation(storeId: StateStoreProviderId)
 private case class DeactivateInstances(runId: UUID)
   extends StateStoreCoordinatorMessage
 
+/**
+ * This message is used to report a state store instance has just finished 
uploading a snapshot,
+ * along with the timestamp in milliseconds and the snapshot version.
+ */
+private case class ReportSnapshotUploaded(
+    storeId: StateStoreProviderId,
+    version: Long,
+    timestamp: Long)
+  extends StateStoreCoordinatorMessage
+
+/**
+ * This message is used for the coordinator to look for all state stores that 
are lagging behind
+ * in snapshot uploads. The coordinator will then log a warning message for 
each lagging instance.
+ */
+private case class ConstructLaggingInstanceReport(
+    queryRunId: UUID,
+    latestVersion: Long,
+    timestamp: Long)
+  extends StateStoreCoordinatorMessage
+
+/**
+ * Message used for testing.
+ * This message is used to retrieve the latest snapshot version reported for 
upload from a
+ * specific state store instance.
+ */
+private case class GetLatestSnapshotVersionForTesting(storeId: 
StateStoreProviderId)
+  extends StateStoreCoordinatorMessage
+
+/**
+ * Message used for testing.
+ * This message is used to retrieve the all active state store instance 
falling behind in

Review Comment:
   nit: remove "the"



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp

Review Comment:
   why are you using `endOfBatchTimestamp`? Why not current timestamp?
   
   it is possible the event was sent after endOfBatch but before we called this 
function. Hence timeDelta will be -ve



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala:
##########
@@ -97,6 +97,13 @@ class StateStoreConf(
   val enableStateStoreCheckpointIds =
     StatefulOperatorStateInfo.enableStateStoreCheckpointIds(sqlConf)
 
+  /**
+   * Whether to report snapshot uploaded messages from the internal RocksDB 
instance
+   * to the state store coordinator.
+   */

Review Comment:
   wrong comment. This conf isn't about internal rocksdb



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp
+
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+              case None =>
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: never uploaded)"
+            }
+            logWarning(logMessage)
+          }
+        } else if (laggingStores.nonEmpty) {
+          logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full 
report was too recent")
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(providerId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) =>
+      val laggingStores = findLaggingStores(queryRunId, latestVersion, 
timestamp)
+      logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+
+    def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = {
+      val versionDelta = latestVersion - version

Review Comment:
   I think I mentioned this in my last review. For the default event, version 
is -1 right. Hence there is a bug here for that.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>

Review Comment:
   Should we limit the number of stores we log here? To avoid a situation where 
a customer has many partitions e.g. 500 partitions and half of them are 
lagging. This would mean we would write 250 messages in the log here. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp
+
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+              case None =>
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: never uploaded)"
+            }
+            logWarning(logMessage)
+          }
+        } else if (laggingStores.nonEmpty) {
+          logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full 
report was too recent")

Review Comment:
   Do we need this log? We already logged a warning that x number of lags were 
found. So if we don't log the full report that implies time hasn't been 
reached. Hence no need for this additional one.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp
+
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+              case None =>
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: never uploaded)"

Review Comment:
   lets use this instead: `no upload for query run` ?
   
   Because it is not that it has never been uploaded. It could have been 
uploaded in a previous run, but nothing in this new run. So lets not say never.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -155,16 +156,327 @@ class StateStoreCoordinatorSuite extends SparkFunSuite 
with SharedSparkContext {
       StateStore.stop()
     }
   }
+

Review Comment:
   also add test for when changelog is disabled.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
provider is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same provider but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for provider 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case ConstructLaggingInstanceReport(queryRunId, latestVersion, 
endOfBatchTimestamp) =>
+      // Only log lagging instances if the snapshot report upload is enabled,
+      // otherwise all instances will be considered lagging.
+      if 
(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+        val laggingStores = findLaggingStores(queryRunId, latestVersion, 
endOfBatchTimestamp)
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag Report for " +
+          log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+          log"Number of state stores falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+        )
+        // Report all stores that are behind in snapshot uploads.
+        // Only report the full list of providers lagging behind if the last 
reported time
+        // is not recent. The lag report interval denotes the minimum time 
between these
+        // full reports.
+        val coordinatorLagReportInterval =
+          
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL)
+        val currentTimestamp = System.currentTimeMillis()
+        if (laggingStores.nonEmpty &&
+          currentTimestamp - lastFullSnapshotLagReport > 
coordinatorLagReportInterval) {
+          // Mark timestamp of the full report and log the lagging instances
+          lastFullSnapshotLagReport = currentTimestamp
+          laggingStores.foreach { providerId =>
+            val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) 
match {
+              case Some(snapshotEvent) =>
+                val versionDelta = latestVersion - snapshotEvent.version
+                val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp
+
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+              case None =>
+                log"StateStoreCoordinator Snapshot Lag Detected for " +
+                log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, 
providerId)} " +
+                log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, 
" +
+                log"latest snapshot: never uploaded)"
+            }
+            logWarning(logMessage)
+          }
+        } else if (laggingStores.nonEmpty) {
+          logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full 
report was too recent")
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(providerId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) =>
+      val laggingStores = findLaggingStores(queryRunId, latestVersion, 
timestamp)
+      logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+
+    def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = {
+      val versionDelta = latestVersion - version
+      val timeDelta = latestTimestamp - timestamp
+
+      // Determine alert thresholds from configurations for both time and 
version differences.
+      val snapshotVersionDeltaMultiplier = sqlConf.getConf(
+        
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG)
+      val maintenanceIntervalMultiplier = sqlConf.getConf(
+        
SQLConf.STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG)
+      val minDeltasForSnapshot = 
sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+      val maintenanceInterval = 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)
+
+      // Use the configured multipliers to determine the proper alert 
thresholds
+      val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot
+      val minTimeDeltaForLogging = maintenanceIntervalMultiplier * 
maintenanceInterval
+
+      // Mark a state store as lagging if it is behind in both version and 
time.
+      // In the case that a snapshot was never uploaded, we treat version -1 
as the preceding
+      // version of 0, and only rely on the version delta condition.
+      // Time requirement will be automatically satisfied as the initial 
timestamp is 0.
+      versionDelta >= minVersionDeltaForLogging && timeDelta > 
minTimeDeltaForLogging
+    }
+
+    override def compare(otherEvent: SnapshotUploadEvent): Int = {
+      // Compare by version first, then by timestamp as tiebreaker
+      val versionCompare = this.version.compare(otherEvent.version)
+      if (versionCompare == 0) {
+        this.timestamp.compare(otherEvent.timestamp)
+      } else {
+        versionCompare
+      }
+    }
+
+    override def toString(): String = {
+      s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)"
+    }
+  }
+
+  private def findLaggingStores(
+      queryRunId: UUID,
+      referenceVersion: Long,
+      referenceTimestamp: Long): Seq[StateStoreProviderId] = {
+    // Do not report any instance as lagging if the snapshot report upload is 
disabled,
+    // since it will treat all active instances as stores that have never 
uploaded.
+    if 
(!sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) {
+      return Seq.empty
+    }
+    // Look for instances that are lagging behind in snapshot uploads
+    instances.keys.filter { storeProviderId =>

Review Comment:
   This PR only reports upload to coordinator for rocksdb state store. Since 
you are not reporting for hdfs, this code will say all hdfs state stores are 
lagging. Is there a reason why you're not reporting for HDFS state store too?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -155,16 +156,327 @@ class StateStoreCoordinatorSuite extends SparkFunSuite 
with SharedSparkContext {
       StateStore.stop()
     }
   }
+
+  test(
+    "SPARK-51358: Snapshot uploads in RocksDB are not reported if changelog " +
+    "checkpointing is disabled"
+  ) {
+    withCoordinatorAndSQLConf(

Review Comment:
   fyi, your tests are not exercising the construct lag rpc code path because 
you are not reducing the `snapshotLagReportInterval` (default is 5 mins) in 
your test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to