micheal-o commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2013323698
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +289,157 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove + // Remove the corresponding run id entries for report time and starting time + lastFullSnapshotLagReportTimeMs -= runId + queryRunStartingPoint -= runId logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, QueryStartInfo(latestVersion, currentTimestamp)) + } else if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, currentTimestamp)) { + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the list of providers lagging behind if the last reported time + // is not recent for this query run. The lag report interval denotes the minimum + // time between these full reports. + val timeSinceLastReport = + currentTimestamp - lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L) + if (timeSinceLastReport > coordinatorLagReportInterval) { + // Mark timestamp of the report and log the lagging instances + lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp) + // Only report the stores that are lagging the most behind in snapshot uploads. + laggingStores + .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, defaultSnapshotUploadEvent)) + .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport) + .foreach { providerId => + val baseLogMessage = + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, providerId.storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}" + + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - Math.max(snapshotEvent.version, 0) + val timeDelta = currentTimestamp - snapshotEvent.timestamp + + baseLogMessage + log", " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: " + + log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + baseLogMessage + log", latest snapshot: no upload for query run)" + } + logWarning(logMessage) + } + } + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Only report if the corresponding run has all the necessary information to start reporting + if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, currentTimestamp)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + } else { + context.reply(Seq.empty) + } + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + private def findLaggingStores( + queryRunId: UUID, + referenceVersion: Long, + referenceTimestamp: Long): Seq[StateStoreProviderId] = { + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = + sqlConf.stateStoreCoordinatorMultiplierForMinVersionDiffToLog + val maintenanceIntervalMultiplier = sqlConf.stateStoreCoordinatorMultiplierForMinTimeDiffToLog + val minDeltasForSnapshot = sqlConf.stateStoreMinDeltasForSnapshot + val maintenanceInterval = sqlConf.streamingMaintenanceInterval + + // Use the configured multipliers to determine the proper alert thresholds + val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * minDeltasForSnapshot + val minTimeDeltaForLogging = maintenanceIntervalMultiplier * maintenanceInterval + + // Look for active state store providers that are lagging behind in snapshot uploads + instances.keys.filter { storeProviderId => + // Only consider providers that are part of this specific query run + val latestSnapshot = stateStoreLatestUploadedSnapshot.getOrElse( + storeProviderId, + defaultSnapshotUploadEvent + ) + storeProviderId.queryRunId == queryRunId && ( Review Comment: why not lazily filter the keys with runId (e.g. `.view.filter`) instead of doing it here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala: ########## @@ -283,6 +290,13 @@ abstract class ProgressContext( progressReporter.lastNoExecutionProgressEventTime = triggerClock.getTimeMillis() progressReporter.updateProgress(newProgress) + // Ask the state store coordinator to log all lagging state stores + if (progressReporter.coordinatorReportSnapshotUploadLag) { Review Comment: nit: should this check to see if it is enabled be done in the `logLaggingStateStores` func? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala: ########## @@ -97,6 +97,12 @@ class StateStoreConf( val enableStateStoreCheckpointIds = StatefulOperatorStateInfo.enableStateStoreCheckpointIds(sqlConf) + /** + * Whether the coordinator is reporting state stores trailing behind in snapshot uploads. + */ + val stateStoreCoordinatorReportSnapshotUploadLag: Boolean = Review Comment: nit: you can do without this prefix `stateStoreCoordinator` for the val. Same for the RocksDBConf ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +289,157 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove + // Remove the corresponding run id entries for report time and starting time + lastFullSnapshotLagReportTimeMs -= runId + queryRunStartingPoint -= runId logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, QueryStartInfo(latestVersion, currentTimestamp)) + } else if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, currentTimestamp)) { + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the list of providers lagging behind if the last reported time + // is not recent for this query run. The lag report interval denotes the minimum + // time between these full reports. + val timeSinceLastReport = + currentTimestamp - lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L) + if (timeSinceLastReport > coordinatorLagReportInterval) { + // Mark timestamp of the report and log the lagging instances + lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp) + // Only report the stores that are lagging the most behind in snapshot uploads. + laggingStores + .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, defaultSnapshotUploadEvent)) + .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport) + .foreach { providerId => + val baseLogMessage = + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, providerId.storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}" + + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - Math.max(snapshotEvent.version, 0) Review Comment: why do we need this max? since we are no longer setting the default values in map right? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -129,10 +198,66 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * and get their locations for job scheduling. */ -private class StateStoreCoordinator(override val rpcEnv: RpcEnv) - extends ThreadSafeRpcEndpoint with Logging { +private class StateStoreCoordinator( + override val rpcEnv: RpcEnv, + val sqlConf: SQLConf) + extends ThreadSafeRpcEndpoint with Logging { private val instances = new mutable.HashMap[StateStoreProviderId, ExecutorCacheTaskLocation] + // Stores the latest snapshot upload event for a specific state store + private val stateStoreLatestUploadedSnapshot = + new mutable.HashMap[StateStoreProviderId, SnapshotUploadEvent] + + // Default snapshot upload event to use when a provider has never uploaded a snapshot + private val defaultSnapshotUploadEvent = SnapshotUploadEvent(-1, 0) + + // Stores the last timestamp in milliseconds for each queryRunId indicating when the + // coordinator did a report on instances lagging behind on snapshot uploads. + // The initial timestamp is defaulted to 0 milliseconds. + private val lastFullSnapshotLagReportTimeMs = new mutable.HashMap[UUID, Long] + + // Stores the time and latest version of the query run's start. Review Comment: nit: you mean starting version not latest version? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +289,157 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove + // Remove the corresponding run id entries for report time and starting time + lastFullSnapshotLagReportTimeMs -= runId + queryRunStartingPoint -= runId logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, QueryStartInfo(latestVersion, currentTimestamp)) + } else if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, currentTimestamp)) { + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the list of providers lagging behind if the last reported time + // is not recent for this query run. The lag report interval denotes the minimum + // time between these full reports. + val timeSinceLastReport = + currentTimestamp - lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L) + if (timeSinceLastReport > coordinatorLagReportInterval) { + // Mark timestamp of the report and log the lagging instances + lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp) + // Only report the stores that are lagging the most behind in snapshot uploads. + laggingStores + .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, defaultSnapshotUploadEvent)) + .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport) + .foreach { providerId => + val baseLogMessage = + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, providerId.storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}" + + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - Math.max(snapshotEvent.version, 0) + val timeDelta = currentTimestamp - snapshotEvent.timestamp + + baseLogMessage + log", " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: " + + log"${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + baseLogMessage + log", latest snapshot: no upload for query run)" + } + logWarning(logMessage) + } + } + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Only report if the corresponding run has all the necessary information to start reporting + if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, currentTimestamp)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + } else { + context.reply(Seq.empty) + } + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + private def findLaggingStores( + queryRunId: UUID, + referenceVersion: Long, + referenceTimestamp: Long): Seq[StateStoreProviderId] = { + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = + sqlConf.stateStoreCoordinatorMultiplierForMinVersionDiffToLog + val maintenanceIntervalMultiplier = sqlConf.stateStoreCoordinatorMultiplierForMinTimeDiffToLog + val minDeltasForSnapshot = sqlConf.stateStoreMinDeltasForSnapshot + val maintenanceInterval = sqlConf.streamingMaintenanceInterval + + // Use the configured multipliers to determine the proper alert thresholds + val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * minDeltasForSnapshot + val minTimeDeltaForLogging = maintenanceIntervalMultiplier * maintenanceInterval + + // Look for active state store providers that are lagging behind in snapshot uploads + instances.keys.filter { storeProviderId => + // Only consider providers that are part of this specific query run + val latestSnapshot = stateStoreLatestUploadedSnapshot.getOrElse( + storeProviderId, + defaultSnapshotUploadEvent + ) + storeProviderId.queryRunId == queryRunId && ( + // Mark a state store as lagging if it's behind in both version and time. + // A state store is considered lagging if it's behind in both version and time according + // to the configured thresholds. + // Stores that didn't upload a snapshot will be treated as a store with a snapshot of + // version 0. + referenceVersion - Math.max(latestSnapshot.version, 0) > minVersionDeltaForLogging && Review Comment: BTW, you can make the version for the default 0, instead of -1. You won't need this then -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org