micheal-o commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r1990598816


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -38,9 +38,20 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{NonFateSharingCache, Utils}
 
+/**
+ * Trait representing events reported from a RocksDB instance.
+ *
+ * The internal RocksDB instance can use a provider with a 
`RocksDBEventListener` reference to

Review Comment:
   > The internal RocksDB instance can use a provider with a 
`RocksDBEventListener` reference
   
   This statement is a bit confusing. Should we say just "We pass this into the 
internal RocksDB instance to report specific events...".



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -65,6 +65,7 @@ case object StoreTaskCompletionListener extends 
RocksDBOpType("store_task_comple
  * @param localRootDir Root directory in local disk that is used to working 
and checkpointing dirs
  * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
  * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ * @param providerListener A reference to the state store provider for event 
callback reporting

Review Comment:
   nit: confusing comment? I mean the "A reference to the state store provider" 
part



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -119,6 +139,31 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
     rpcEndpointRef.askSync[Boolean](DeactivateInstances(runId))
   }
 
+  /** Inform that an executor has uploaded a snapshot */
+  private[sql] def snapshotUploaded(
+      storeProviderId: StateStoreProviderId,
+      version: Long,
+      timestamp: Long): Unit = {
+    rpcEndpointRef.askSync[Boolean](SnapshotUploaded(storeProviderId, version, 
timestamp))
+  }
+
+  /**
+   * Endpoint used for testing.
+   * Get the latest snapshot version uploaded for a state store.
+   */
+  private[sql] def getLatestSnapshotVersion(

Review Comment:
   Add ForTesting suffix?
   
   Also should be `private[state]` since it is only for testing?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -66,9 +86,9 @@ object StateStoreCoordinatorRef extends Logging {
   /**
    * Create a reference to a [[StateStoreCoordinator]]
    */
-  def forDriver(env: SparkEnv): StateStoreCoordinatorRef = synchronized {
+  def forDriver(env: SparkEnv, conf: SQLConf): StateStoreCoordinatorRef = 
synchronized {
     try {
-      val coordinator = new StateStoreCoordinator(env.rpcEnv)
+      val coordinator = new StateStoreCoordinator(env.rpcEnv, conf)

Review Comment:
   you are passing in current session conf, but if i remember correctly we only 
have one coordinator across all sessions. I don't think we create a coordinator 
per session. Can you confirm? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")

Review Comment:
   nit: better to say: snapshot version 'foo' was uploaded for provider 'bar'



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +

Review Comment:
   if changelog checkpointing is off, hence no snapshot upload report, it seems 
this will report them as lagging? That is incorrect.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -55,6 +56,25 @@ private case class GetLocation(storeId: StateStoreProviderId)
 private case class DeactivateInstances(runId: UUID)
   extends StateStoreCoordinatorMessage
 
+private case class SnapshotUploaded(storeId: StateStoreProviderId, version: 
Long, timestamp: Long)

Review Comment:
   Also rename to `ReportSnapshotUploaded`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -55,6 +56,25 @@ private case class GetLocation(storeId: StateStoreProviderId)
 private case class DeactivateInstances(runId: UUID)
   extends StateStoreCoordinatorMessage
 
+private case class SnapshotUploaded(storeId: StateStoreProviderId, version: 
Long, timestamp: Long)
+  extends StateStoreCoordinatorMessage
+
+/**
+ * Message used for testing.
+ * This message is used to retrieve the latest snapshot version reported for 
upload from a
+ * specific state store instance.
+ */
+private case class GetLatestSnapshotVersion(storeId: StateStoreProviderId)
+  extends StateStoreCoordinatorMessage
+
+/**
+ * Message used for testing.
+ * This message is used to retrieve the all active state store instance 
falling behind in
+ * snapshot uploads, whether it is through version or time criteria.
+ */
+private case class GetLaggingStores()

Review Comment:
   nit: add `ForTesting` suffix to method name, so it is super obvious it is 
only for testing



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -38,9 +38,20 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{NonFateSharingCache, Utils}
 
+/**
+ * Trait representing events reported from a RocksDB instance.
+ *
+ * The internal RocksDB instance can use a provider with a 
`RocksDBEventListener` reference to
+ * report specific events like snapshot uploads. This should only be used to 
report back to the
+ * coordinator for metrics and monitoring purposes.
+ */
+trait RocksDBEventListener {
+  def reportSnapshotUploaded(version: Long): Unit
+}
+
 private[sql] class RocksDBStateStoreProvider
   extends StateStoreProvider with Logging with Closeable
-  with SupportsFineGrainedReplay {
+  with SupportsFineGrainedReplay with RocksDBEventListener {

Review Comment:
   I would rather implement a separate class instead of adding this to the 
provider.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -129,10 +174,17 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
  * Class for coordinating instances of [[StateStore]]s loaded in executors 
across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
-    extends ThreadSafeRpcEndpoint with Logging {
+private class StateStoreCoordinator(
+    override val rpcEnv: RpcEnv,
+    val sqlConf: SQLConf)
+  extends ThreadSafeRpcEndpoint
+  with Logging {
   private val instances = new mutable.HashMap[StateStoreProviderId, 
ExecutorCacheTaskLocation]
 
+  // Stores the latest snapshot version of a specific state store provider 
instance
+  private val stateStoreSnapshotVersions =

Review Comment:
   nit: should this be `stateStoreLatestUploadedSnapshotVersion` or something 
more clear?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -55,6 +56,25 @@ private case class GetLocation(storeId: StateStoreProviderId)
 private case class DeactivateInstances(runId: UUID)
   extends StateStoreCoordinatorMessage
 
+private case class SnapshotUploaded(storeId: StateStoreProviderId, version: 
Long, timestamp: Long)

Review Comment:
   nit: add comment



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1033,6 +1033,12 @@ object StateStore extends Logging {
     }
   }
 
+  def reportSnapshotUploaded(storeProviderId: StateStoreProviderId, 
snapshotVersion: Long): Unit = {

Review Comment:
   nit: make private and only accessible within state package?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2236,6 +2236,19 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG =
+    buildConf("spark.sql.streaming.stateStore.minSnapshotVersionDeltaToLog")
+      .internal()
+      .doc(
+        "Minimum number of versions between the most recent uploaded snapshot 
version of a " +
+        "single state store instance and the most recent version across all 
state store " +
+        "instances to log a warning message."
+      )
+      .version("4.0.0")
+      .intConf
+      .checkValue(k => k >= 0, "Must be greater than or equal to 0")

Review Comment:
   what happens if equal to 0? Should we be able to turn this feature off?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -119,6 +139,31 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
     rpcEndpointRef.askSync[Boolean](DeactivateInstances(runId))
   }
 
+  /** Inform that an executor has uploaded a snapshot */
+  private[sql] def snapshotUploaded(
+      storeProviderId: StateStoreProviderId,
+      version: Long,
+      timestamp: Long): Unit = {
+    rpcEndpointRef.askSync[Boolean](SnapshotUploaded(storeProviderId, version, 
timestamp))
+  }
+
+  /**
+   * Endpoint used for testing.
+   * Get the latest snapshot version uploaded for a state store.
+   */
+  private[sql] def getLatestSnapshotVersion(
+      stateStoreProviderId: StateStoreProviderId): Option[Long] = {
+    
rpcEndpointRef.askSync[Option[Long]](GetLatestSnapshotVersion(stateStoreProviderId))
+  }
+
+  /**
+   * Endpoint used for testing.
+   * Get the state store instances that are falling behind in snapshot uploads.
+   */
+  private[sql] def getLaggingStores(): Seq[StateStoreProviderId] = {

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))

Review Comment:
   I think we should only update if it is a higher version than the current 
recorded version. To avoid a situation where provider was moved to another 
executor but previous executor maintenance thread uploaded a old version after 
new executor uploaded new version. Chances of this happening is low but not 
zero, so lets handle and add comment.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never 
uploaded)"
+          }
+          logWarning(logMessage)
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersion(providerId) =>
+      val version = stateStoreSnapshotVersions.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStores =>
+      val (laggingStores, _) = findLaggingStores()
+      logDebug(s"Got lagging state stores ${laggingStores
+        .map(
+          id =>

Review Comment:
   id.storeId already implements `toString`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never 
uploaded)"
+          }
+          logWarning(logMessage)
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersion(providerId) =>
+      val version = stateStoreSnapshotVersions.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStores =>
+      val (laggingStores, _) = findLaggingStores()
+      logDebug(s"Got lagging state stores ${laggingStores
+        .map(
+          id =>
+            s"StateStoreId(operatorId=${id.storeId.operatorId}, " +
+            s"partitionId=${id.storeId.partitionId}, " +
+            s"storeName=${id.storeId.storeName})"
+        )
+        .mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+    def isLagging(latest: SnapshotUploadEvent): Boolean = {
+      val versionDelta = latest.version - version
+      val timeDelta = latest.timestamp - timestamp
+      val minVersionDeltaForLogging =
+        
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG)
+      // Use 10 times the maintenance interval as the minimum time delta for 
logging
+      val minTimeDeltaForLogging = 10 * 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)

Review Comment:
   why hard code 10? should this be a conf or something?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never 
uploaded)"
+          }
+          logWarning(logMessage)
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersion(providerId) =>
+      val version = stateStoreSnapshotVersions.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStores =>
+      val (laggingStores, _) = findLaggingStores()
+      logDebug(s"Got lagging state stores ${laggingStores
+        .map(
+          id =>
+            s"StateStoreId(operatorId=${id.storeId.operatorId}, " +
+            s"partitionId=${id.storeId.partitionId}, " +
+            s"storeName=${id.storeId.storeName})"
+        )
+        .mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+    def isLagging(latest: SnapshotUploadEvent): Boolean = {
+      val versionDelta = latest.version - version

Review Comment:
   what if this is called with the version -1?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +

Review Comment:
   nit: `StateStoreCoordinator Snapshot Lag Detected` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()

Review Comment:
   should we do this on an interval, instead of every time. e.g. in a situation 
where we have lets say 200 providers and 5 are lagging. Currently every time 
there is an upload for the 195 other providers, we will log. This may become 
noisy. Lets do it every x secs or mins to avoid noise?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never 
uploaded)"
+          }
+          logWarning(logMessage)
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersion(providerId) =>
+      val version = stateStoreSnapshotVersions.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStores =>
+      val (laggingStores, _) = findLaggingStores()
+      logDebug(s"Got lagging state stores ${laggingStores
+        .map(
+          id =>
+            s"StateStoreId(operatorId=${id.storeId.operatorId}, " +
+            s"partitionId=${id.storeId.partitionId}, " +
+            s"storeName=${id.storeId.storeName})"
+        )
+        .mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+    def isLagging(latest: SnapshotUploadEvent): Boolean = {
+      val versionDelta = latest.version - version
+      val timeDelta = latest.timestamp - timestamp
+      val minVersionDeltaForLogging =
+        
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG)
+      // Use 10 times the maintenance interval as the minimum time delta for 
logging
+      val minTimeDeltaForLogging = 10 * 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)
+
+      versionDelta >= minVersionDeltaForLogging ||
+        (version >= 0 && timeDelta > minTimeDeltaForLogging)
+    }
+
+    override def compare(that: SnapshotUploadEvent): Int = {
+      this.version.compare(that.version)
+    }
+
+    override def toString(): String = {
+      s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)"
+    }
+  }
+
+  private def findLaggingStores(): (Seq[StateStoreProviderId], 
SnapshotUploadEvent) = {
+    if (instances.isEmpty) {
+      return (Seq.empty, SnapshotUploadEvent(-1, 0))
+    }
+    // Find the most updated instance to use as reference point
+    val latestSnapshot = instances
+      .map(
+        instance => stateStoreSnapshotVersions.getOrElse(instance._1, 
SnapshotUploadEvent(-1, 0))
+      ).max

Review Comment:
   This is incorrect right. It is checking all providers, even providers that 
belongs to other queries. e.g. if query a has 100 batches but query b has only 
2 batches, this will say query b providers are lagging, which isn't correct.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never 
uploaded)"
+          }
+          logWarning(logMessage)
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersion(providerId) =>
+      val version = stateStoreSnapshotVersions.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStores =>
+      val (laggingStores, _) = findLaggingStores()
+      logDebug(s"Got lagging state stores ${laggingStores
+        .map(
+          id =>
+            s"StateStoreId(operatorId=${id.storeId.operatorId}, " +
+            s"partitionId=${id.storeId.partitionId}, " +
+            s"storeName=${id.storeId.storeName})"
+        )
+        .mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+    def isLagging(latest: SnapshotUploadEvent): Boolean = {
+      val versionDelta = latest.version - version
+      val timeDelta = latest.timestamp - timestamp
+      val minVersionDeltaForLogging =
+        
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG)
+      // Use 10 times the maintenance interval as the minimum time delta for 
logging
+      val minTimeDeltaForLogging = 10 * 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)
+
+      versionDelta >= minVersionDeltaForLogging ||
+        (version >= 0 && timeDelta > minTimeDeltaForLogging)
+    }
+
+    override def compare(that: SnapshotUploadEvent): Int = {
+      this.version.compare(that.version)
+    }
+
+    override def toString(): String = {
+      s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)"
+    }
+  }
+
+  private def findLaggingStores(): (Seq[StateStoreProviderId], 
SnapshotUploadEvent) = {
+    if (instances.isEmpty) {
+      return (Seq.empty, SnapshotUploadEvent(-1, 0))

Review Comment:
   nit: should we define `SnapshotUploadEvent(-1, 0)` somewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to