ericm-db commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2010663948
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +252,167 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, (currentTimestamp, latestVersion)) + } + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the list of providers lagging behind if the last reported time + // is not recent for this query run. The lag report interval denotes the minimum + // time between these full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) Review Comment: define `def coordinatorLagReportInterval` outside of this ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +252,167 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, (currentTimestamp, latestVersion)) + } + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the list of providers lagging behind if the last reported time + // is not recent for this query run. The lag report interval denotes the minimum + // time between these full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val timeSinceLastReport = + currentTimestamp - lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L) + if (timeSinceLastReport > coordinatorLagReportInterval) { + // Mark timestamp of the report and log the lagging instances + lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp) + // Only report the stores that are lagging the most behind in snapshot uploads. + laggingStores + .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, defaultSnapshotUploadEvent)) + .take(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT)) + .foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - Math.max(snapshotEvent.version, 0) + val timeDelta = currentTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, providerId.storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: " + + log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, providerId.storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: no upload for query run)" + } + logWarning(logMessage) + } + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + private def findLaggingStores( + queryRunId: UUID, + referenceVersion: Long, + referenceTimestamp: Long): Seq[StateStoreProviderId] = { + // Do not report any instance as lagging if report snapshot upload is disabled. + if (!sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG)) { + return Seq.empty + } + + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG) + val maintenanceIntervalMultiplier = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_TIME_DIFF_TO_LOG) + val minDeltasForSnapshot = sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + val maintenanceInterval = sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) Review Comment: Define these in SQLConf.scala? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +252,167 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, (currentTimestamp, latestVersion)) + } + // Only log lagging instances if the snapshot report upload is enabled, Review Comment: add else case ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +252,167 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, (currentTimestamp, latestVersion)) + } + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the list of providers lagging behind if the last reported time + // is not recent for this query run. The lag report interval denotes the minimum + // time between these full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val timeSinceLastReport = + currentTimestamp - lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L) + if (timeSinceLastReport > coordinatorLagReportInterval) { + // Mark timestamp of the report and log the lagging instances + lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp) + // Only report the stores that are lagging the most behind in snapshot uploads. + laggingStores + .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, defaultSnapshotUploadEvent)) + .take(sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT)) + .foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - Math.max(snapshotEvent.version, 0) + val timeDelta = currentTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, providerId.storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + Review Comment: Can we extract these out above the case match ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -129,10 +198,29 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * and get their locations for job scheduling. */ -private class StateStoreCoordinator(override val rpcEnv: RpcEnv) - extends ThreadSafeRpcEndpoint with Logging { +private class StateStoreCoordinator( + override val rpcEnv: RpcEnv, + val sqlConf: SQLConf) + extends ThreadSafeRpcEndpoint with Logging { private val instances = new mutable.HashMap[StateStoreProviderId, ExecutorCacheTaskLocation] + // Stores the latest snapshot upload event for a specific state store + private val stateStoreLatestUploadedSnapshot = + new mutable.HashMap[StateStoreProviderId, SnapshotUploadEvent] + + // Default snapshot upload event to use when a provider has never uploaded a snapshot + private val defaultSnapshotUploadEvent = SnapshotUploadEvent(-1, 0) + + // Stores the last timestamp in milliseconds for each queryRunId indicating when the + // coordinator did a report on instances lagging behind on snapshot uploads. + // The initial timestamp is defaulted to 0 milliseconds. + private val lastFullSnapshotLagReportTimeMs = new mutable.HashMap[UUID, Long] + + // Stores the time and latest version of the query run's start. + // Queries that started recently should not have their state stores reported as lagging + // since we may not have all the information yet. + private val queryRunStartingPoint = new mutable.HashMap[UUID, (Long, Long)] Review Comment: check if we can remove timestamp from this map -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org