HeartSaVioR commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2034367383


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -593,7 +593,10 @@ trait StateStoreProvider {
   def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] = Seq.empty
 }
 
-object StateStoreProvider {
+object StateStoreProvider extends Logging {
+
+  @GuardedBy("this")

Review Comment:
   We have the same in StateStore object - is there specific reason to have 
this in StateStoreProvider object as well?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -971,3 +973,33 @@ class RocksDBStateStoreChangeDataReader(
     }
   }
 }
+
+/**
+ * Class used to relay events reported from a RocksDB instance to the state 
store coordinator.
+ *
+ * We pass this into the RocksDB instance to report specific events like 
snapshot uploads.
+ * This should only be used to report back to the coordinator for metrics and 
monitoring purposes.
+ */
+private[state] case class RocksDBEventListener(queryRunId: String, 
stateStoreId: StateStoreId) {

Review Comment:
   nit: maybe `Router` or `Forwarder` could give the more context?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -652,6 +655,45 @@ object StateStoreProvider {
       }
     }
   }
+
+  /**
+   * Get the runId from the provided hadoopConf. If it is not found, generate 
a random UUID.
+   *
+   * @param hadoopConf Hadoop configuration used by the StateStore to save 
state data
+   */
+  private[state] def getRunId(hadoopConf: Configuration): String = {
+    val runId = hadoopConf.get(StreamExecution.RUN_ID_KEY)
+    if (runId != null) {
+      runId
+    } else {
+      assert(Utils.isTesting, "Failed to find query id/batch Id in task 
context")
+      UUID.randomUUID().toString
+    }
+  }
+
+  /**

Review Comment:
   Same here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -119,6 +159,46 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
     rpcEndpointRef.askSync[Boolean](DeactivateInstances(runId))
   }
 
+  /** Inform that an executor has uploaded a snapshot */
+  private[sql] def snapshotUploaded(
+      providerId: StateStoreProviderId,
+      version: Long,
+      timestamp: Long): Unit = {
+    rpcEndpointRef.askSync[Boolean](ReportSnapshotUploaded(providerId, 
version, timestamp))
+  }
+
+  /** Ask the coordinator to log all state store instances that are lagging 
behind in uploads */
+  private[sql] def logLaggingStateStores(
+      queryRunId: UUID,
+      latestVersion: Long,
+      isTerminatingTrigger: Boolean): Unit = {

Review Comment:
   same here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -119,6 +159,46 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
     rpcEndpointRef.askSync[Boolean](DeactivateInstances(runId))
   }
 
+  /** Inform that an executor has uploaded a snapshot */
+  private[sql] def snapshotUploaded(
+      providerId: StateStoreProviderId,
+      version: Long,
+      timestamp: Long): Unit = {

Review Comment:
   nit: different from the request's return type?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -523,15 +525,25 @@ private[sql] class RocksDBStateStoreProvider
   @volatile private var useColumnFamilies: Boolean = _
   @volatile private var stateStoreEncoding: String = _
   @volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _
+  @volatile private var rocksDBEventListener: RocksDBEventListener = _
 
   private[sql] lazy val rocksDB = {
     val dfsRootDir = stateStoreId.storeCheckpointLocation().toString
     val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," +
       s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})"
     val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
     val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), 
storeIdStr)
-    new RocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, 
storeIdStr,
-      useColumnFamilies, storeConf.enableStateStoreCheckpointIds, 
stateStoreId.partitionId)
+    new RocksDB(
+      dfsRootDir,
+      RocksDBConf(storeConf),
+      localRootDir,
+      hadoopConf,
+      storeIdStr,
+      useColumnFamilies,
+      storeConf.enableStateStoreCheckpointIds,
+      stateStoreId.partitionId,
+      Some(rocksDBEventListener)

Review Comment:
   nit: I'd safely use `Option` here, unless we intentionally allow having the 
value of `Some(null)`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -164,13 +264,159 @@ private class StateStoreCoordinator(override val rpcEnv: 
RpcEnv)
       val storeIdsToRemove =
         instances.keys.filter(_.queryRunId == runId).toSeq
       instances --= storeIdsToRemove
+      // Also remove these instances from snapshot upload event tracking
+      stateStoreLatestUploadedSnapshot --= storeIdsToRemove
+      // Remove the corresponding run id entries for report time and starting 
time
+      lastFullSnapshotLagReportTimeMs -= runId
       logDebug(s"Deactivating instances related to checkpoint location $runId: 
" +
         storeIdsToRemove.mkString(", "))
       context.reply(true)
 
+    case ReportSnapshotUploaded(providerId, version, timestamp) =>
+      // Ignore this upload event if the registered latest version for the 
store is more recent,
+      // since it's possible that an older version gets uploaded after a new 
executor uploads for
+      // the same state store but with a newer snapshot.
+      logDebug(s"Snapshot version $version was uploaded for state store 
$providerId")
+      if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version 
>= version)) {
+        stateStoreLatestUploadedSnapshot.put(providerId, 
SnapshotUploadEvent(version, timestamp))
+      }
+      context.reply(true)
+
+    case LogLaggingStateStores(queryRunId, latestVersion, 
isTerminatingTrigger) =>
+      val currentTimestamp = System.currentTimeMillis()
+      // Only log lagging instances if snapshot lag reporting and uploading is 
enabled,
+      // otherwise all instances will be considered lagging.
+      if (shouldCoordinatorReportSnapshotLag) {
+        val laggingStores =
+          findLaggingStores(queryRunId, latestVersion, currentTimestamp, 
isTerminatingTrigger)
+        if (laggingStores.nonEmpty) {
+          logWarning(
+            log"StateStoreCoordinator Snapshot Lag Report for " +
+            log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+            log"Number of state stores falling behind: " +
+            log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}"
+          )
+          // Report all stores that are behind in snapshot uploads.
+          // Only report the list of providers lagging behind if the last 
reported time
+          // is not recent for this query run. The lag report interval denotes 
the minimum
+          // time between these full reports.
+          val timeSinceLastReport =
+            currentTimestamp - 
lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L)
+          if (timeSinceLastReport > coordinatorLagReportInterval) {
+            // Mark timestamp of the report and log the lagging instances
+            lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp)
+            // Only report the stores that are lagging the most behind in 
snapshot uploads.
+            laggingStores
+              .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, 
defaultSnapshotUploadEvent))
+              .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport)
+              .foreach { providerId =>
+                val baseLogMessage =
+                  log"StateStoreCoordinator Snapshot Lag Detected for " +
+                  log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " +
+                  log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, 
providerId.storeId)} " +
+                  log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, 
latestVersion)}"
+
+                val logMessage = 
stateStoreLatestUploadedSnapshot.get(providerId) match {
+                  case Some(snapshotEvent) =>
+                    val versionDelta = latestVersion - snapshotEvent.version
+                    val timeDelta = currentTimestamp - snapshotEvent.timestamp
+
+                    baseLogMessage + log", " +
+                    log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, 
snapshotEvent)}, " +
+                    log"version delta: " +
+                    log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, 
versionDelta)}, " +
+                    log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, 
timeDelta)}ms)"
+                  case None =>
+                    baseLogMessage + log", latest snapshot: no upload for 
query run)"
+                }
+                logWarning(logMessage)
+              }
+          }
+        }
+      }
+      context.reply(true)
+
+    case GetLatestSnapshotVersionForTesting(providerId) =>
+      val version = 
stateStoreLatestUploadedSnapshot.get(providerId).map(_.version)
+      logDebug(s"Got latest snapshot version of the state store $providerId: 
$version")
+      context.reply(version)
+
+    case GetLaggingStoresForTesting(queryRunId, latestVersion, 
isTerminatingTrigger) =>
+      val currentTimestamp = System.currentTimeMillis()
+      // Only report if snapshot lag reporting is enabled
+      if (shouldCoordinatorReportSnapshotLag) {
+        val laggingStores =
+          findLaggingStores(queryRunId, latestVersion, currentTimestamp, 
isTerminatingTrigger)
+        logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}")
+        context.reply(laggingStores)
+      } else {
+        context.reply(Seq.empty)
+      }
+
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been 
deregistered
       logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
+
+  private def findLaggingStores(
+      queryRunId: UUID,
+      referenceVersion: Long,
+      referenceTimestamp: Long,
+      isTerminatingTrigger: Boolean): Seq[StateStoreProviderId] = {
+    // Determine alert thresholds from configurations for both time and 
version differences.
+    val snapshotVersionDeltaMultiplier =
+      sqlConf.stateStoreCoordinatorMultiplierForMinVersionDiffToLog
+    val maintenanceIntervalMultiplier = 
sqlConf.stateStoreCoordinatorMultiplierForMinTimeDiffToLog
+    val minDeltasForSnapshot = sqlConf.stateStoreMinDeltasForSnapshot
+    val maintenanceInterval = sqlConf.streamingMaintenanceInterval
+
+    // Use the configured multipliers to determine the proper alert thresholds
+    val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot

Review Comment:
   nit: Probably good to mention the relevant config name explicitly in the doc 
of the config. Same for below.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -55,6 +56,39 @@ private case class GetLocation(storeId: StateStoreProviderId)
 private case class DeactivateInstances(runId: UUID)
   extends StateStoreCoordinatorMessage
 
+/**
+ * This message is used to report a state store has just finished uploading a 
snapshot,
+ * along with the timestamp in milliseconds and the snapshot version.
+ */
+private case class ReportSnapshotUploaded(
+    providerId: StateStoreProviderId,
+    version: Long,
+    timestamp: Long)

Review Comment:
   > I figured this would be helpful in distinguishing between:
   > 
   > * reporting the snapshot version just uploaded vs
   > * reporting the snapshot version just loaded but was uploaded at a 
previous unknown time
   
   How do we distinguish both? Do we put some marker value for the latter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to