zecookiez commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2012620201
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +247,149 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(storeId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $storeId") + if (!stateStoreLatestUploadedSnapshot.get(storeId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(storeId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val currentTimestamp = System.currentTimeMillis() + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReportTimeMs > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReportTimeMs = currentTimestamp + // Only report the stores that are lagging the most behind in snapshot uploads. + laggingStores + .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, defaultSnapshotUploadEvent)) + .take(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT)) + .foreach { storeId => + val logMessage = stateStoreLatestUploadedSnapshot.get(storeId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - Math.max(snapshotEvent.version, 0) + val timeDelta = currentTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: " + + log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: no upload for query run)" + } + logWarning(logMessage) + } + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(storeId) => + val version = stateStoreLatestUploadedSnapshot.get(storeId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $storeId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + + def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = { + // Use version 0 for stores that have not uploaded a snapshot version for this run. + val versionDelta = latestVersion - Math.max(version, 0) + val timeDelta = latestTimestamp - timestamp + + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG) + val maintenanceIntervalMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_TIME_DIFF_TO_LOG) + val minDeltasForSnapshot = sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + val maintenanceInterval = sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + // Use the configured multipliers to determine the proper alert thresholds + val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * minDeltasForSnapshot + val minTimeDeltaForLogging = maintenanceIntervalMultiplier * maintenanceInterval + + // Mark a state store as lagging if it is behind in both version and time. + // For stores that have never uploaded a snapshot, the time requirement will + // be automatically satisfied as the initial timestamp is 0. + versionDelta > minVersionDeltaForLogging && timeDelta > minTimeDeltaForLogging + } + + override def compare(otherEvent: SnapshotUploadEvent): Int = { + // Compare by version first, then by timestamp as tiebreaker + val versionCompare = this.version.compare(otherEvent.version) + if (versionCompare == 0) { + this.timestamp.compare(otherEvent.timestamp) + } else { + versionCompare + } + } + + override def toString(): String = { + s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)" + } + } + + private def findLaggingStores( + queryRunId: UUID, + referenceVersion: Long, + referenceTimestamp: Long): Seq[StateStoreId] = { + // Do not report any instance as lagging if report snapshot upload is disabled. + if (!sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG)) { + return Seq.empty + } + // Look for state stores that are lagging behind in snapshot uploads + instances.keys + .filter { storeProviderId => + // Only consider active providers that are part of this specific query run, + // but look through all state stores under this store ID, as it's possible that + // the same query re-runs with a new run ID but has already uploaded some snapshots. Review Comment: Cleared the rest of the confusion offline, coordinator will not have its previous information between query runs and this has been addressed now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org