zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r1992793749


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case SnapshotUploaded(providerId, version, timestamp) =>
+      stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, 
timestamp))
+      logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+      // Report all stores that are behind in snapshot uploads
+      val (laggingStores, latestSnapshot) = findLaggingStores()
+      if (laggingStores.nonEmpty) {
+        logWarning(
+          log"StateStoreCoordinator Snapshot Lag - Number of state stores 
falling behind: " +
+          log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+          log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
latestSnapshot)})"
+        )
+        laggingStores.foreach { storeProviderId =>
+          val logMessage = stateStoreSnapshotVersions.get(storeProviderId) 
match {
+            case Some(snapshotEvent) =>
+              val versionDelta = latestSnapshot.version - snapshotEvent.version
+              val timeDelta = latestSnapshot.timestamp - 
snapshotEvent.timestamp
+
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+              log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+              log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+            case None =>
+              log"StateStoreCoordinator Snapshot Lag - State store falling 
behind " +
+              log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never 
uploaded)"
+          }
+          logWarning(logMessage)
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersion(providerId) =>
+      val version = stateStoreSnapshotVersions.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStores =>
+      val (laggingStores, _) = findLaggingStores()
+      logDebug(s"Got lagging state stores ${laggingStores
+        .map(
+          id =>
+            s"StateStoreId(operatorId=${id.storeId.operatorId}, " +
+            s"partitionId=${id.storeId.partitionId}, " +
+            s"storeName=${id.storeId.storeName})"
+        )
+        .mkString(", ")}")
+      context.reply(laggingStores)
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  case class SnapshotUploadEvent(
+      version: Long,
+      timestamp: Long
+  ) extends Ordered[SnapshotUploadEvent] {
+    def isLagging(latest: SnapshotUploadEvent): Boolean = {
+      val versionDelta = latest.version - version
+      val timeDelta = latest.timestamp - timestamp
+      val minVersionDeltaForLogging =
+        
sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG)
+      // Use 10 times the maintenance interval as the minimum time delta for 
logging
+      val minTimeDeltaForLogging = 10 * 
sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL)
+
+      versionDelta >= minVersionDeltaForLogging ||
+        (version >= 0 && timeDelta > minTimeDeltaForLogging)
+    }
+
+    override def compare(that: SnapshotUploadEvent): Int = {
+      this.version.compare(that.version)
+    }
+
+    override def toString(): String = {
+      s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)"
+    }
+  }
+
+  private def findLaggingStores(): (Seq[StateStoreProviderId], 
SnapshotUploadEvent) = {
+    if (instances.isEmpty) {
+      return (Seq.empty, SnapshotUploadEvent(-1, 0))
+    }
+    // Find the most updated instance to use as reference point
+    val latestSnapshot = instances
+      .map(
+        instance => stateStoreSnapshotVersions.getOrElse(instance._1, 
SnapshotUploadEvent(-1, 0))
+      ).max

Review Comment:
   Good point, I just switched it to retrieve the latest snapshot on a query 
basis instead. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to