micheal-o commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2004839454
########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2256,6 +2256,59 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog") Review Comment: nit: the conf name is a bit confusing, even the doc message can be simplified ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2256,6 +2256,59 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog") + .internal() + .doc( + "This multiplier determines the minimum version threshold for logging warnings when a " + + "state store instance falls behind. The coordinator logs a warning if a state store's " + + "last uploaded snapshot's version lags behind the query's latest known version by " + + "this threshold. The threshold is calculated as the configured minimum number of deltas " + + "needed to create a snapshot, multiplied by this multiplier." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 1, "Must be greater than or equal to 1") + .createWithDefault(5) + + val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog") + .internal() + .doc( + "This multiplier determines the minimum time threshold for logging warnings when a " + + "state store instance falls behind. The coordinator logs a warning if a state store's " + + "last snapshot upload time lags behind the current time by this threshold. " + + "The threshold is calculated as the maintenance interval multiplied by this multiplier." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 1, "Must be greater than or equal to 1") + .createWithDefault(10) + + val STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED = + buildConf("spark.sql.streaming.stateStore.coordinatorReportUpload.enabled") Review Comment: nit: `coordinatorReportSnapshotUploadLag` ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -966,3 +969,40 @@ class RocksDBStateStoreChangeDataReader( } } } + +/** + * Object used to relay events reported from a RocksDB instance to the state store coordinator. Review Comment: nit: use `Class` instead of `Object` to avoid confusion with scala object type ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2256,6 +2256,59 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog") + .internal() + .doc( + "This multiplier determines the minimum version threshold for logging warnings when a " + + "state store instance falls behind. The coordinator logs a warning if a state store's " + + "last uploaded snapshot's version lags behind the query's latest known version by " + + "this threshold. The threshold is calculated as the configured minimum number of deltas " + + "needed to create a snapshot, multiplied by this multiplier." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 1, "Must be greater than or equal to 1") + .createWithDefault(5) + + val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog") Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -966,3 +969,40 @@ class RocksDBStateStoreChangeDataReader( } } } + +/** + * Object used to relay events reported from a RocksDB instance to the state store coordinator. + * + * We pass this into the RocksDB instance to report specific events like snapshot uploads. + * This should only be used to report back to the coordinator for metrics and monitoring purposes. + */ +private[state] case class RocksDBEventListener( + queryRunId: String, + stateStoreId: StateStoreId, + storeConf: StateStoreConf) { + + /** ID of the state store provider managing the RocksDB instance */ + private val stateStoreProviderId: StateStoreProviderId = + StateStoreProviderId(stateStoreId, UUID.fromString(queryRunId)) + + /** Whether the event listener should relay these messages to the state store coordinator */ + private val coordinatorReportUploadEnabled: Boolean = + storeConf.stateStoreCoordinatorReportUploadEnabled + + /** + * Callback function from RocksDB to report events to the coordinator. + * Additional information such as the state store ID and the query run ID are + * attached here to report back to the coordinator. + * + * @param version The snapshot version that was just uploaded from RocksDB + */ + def reportSnapshotUploaded(version: Long): Unit = { + // Only report to the coordinator if this is enabled, as sometimes we do not need + // to track for lagging instances. + // Also ignore message if we are missing the provider ID from lack of initialization. Review Comment: i don't see code for what this comment says ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -129,10 +210,25 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * and get their locations for job scheduling. */ -private class StateStoreCoordinator(override val rpcEnv: RpcEnv) - extends ThreadSafeRpcEndpoint with Logging { +private class StateStoreCoordinator( + override val rpcEnv: RpcEnv, + val sqlConf: SQLConf) + extends ThreadSafeRpcEndpoint + with Logging { private val instances = new mutable.HashMap[StateStoreProviderId, ExecutorCacheTaskLocation] + // Stores the latest snapshot upload event for a specific state store provider instance + private val stateStoreLatestUploadedSnapshot = + new mutable.HashMap[StateStoreProviderId, SnapshotUploadEvent] + + // Default snapshot upload event to use when a provider has never uploaded a snapshot + private val defaultSnapshotUploadEvent = SnapshotUploadEvent(-1, 0) + + // Stores the last timestamp in milliseconds where the coordinator did a full report on + // instances lagging behind on snapshot uploads. The initial timestamp is defaulted to + // 0 milliseconds. + private var lastFullSnapshotLagReport = 0L Review Comment: nit: `lastFullSnapshotLagReportTimeMs` ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: never uploaded)" + } + logWarning(logMessage) + } + } else if (laggingStores.nonEmpty) { + logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full report was too recent") + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) => + val laggingStores = findLaggingStores(queryRunId, latestVersion, timestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + + def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = { + val versionDelta = latestVersion - version + val timeDelta = latestTimestamp - timestamp + + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG) + val maintenanceIntervalMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG) + val minDeltasForSnapshot = sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + val maintenanceInterval = sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + // Use the configured multipliers to determine the proper alert thresholds + val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * minDeltasForSnapshot + val minTimeDeltaForLogging = maintenanceIntervalMultiplier * maintenanceInterval + + // Mark a state store as lagging if it is behind in both version and time. + // In the case that a snapshot was never uploaded, we treat version -1 as the preceding + // version of 0, and only rely on the version delta condition. + // Time requirement will be automatically satisfied as the initial timestamp is 0. Review Comment: This condition only makes sense for normal case. But for a new run this will make a false noise. e.g. the query previously ran up to version 50 and uploaded snapshot. And now a new run is started, this code will think a snapshot has never been uploaded and wrongly mark it as lagging. We need to handle that otherwise every new run of a query will say all its state stores are lagging which is wrong. And will cause noise in logs/dashboard/alert. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2256,6 +2256,59 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog") + .internal() + .doc( + "This multiplier determines the minimum version threshold for logging warnings when a " + + "state store instance falls behind. The coordinator logs a warning if a state store's " + + "last uploaded snapshot's version lags behind the query's latest known version by " + + "this threshold. The threshold is calculated as the configured minimum number of deltas " + + "needed to create a snapshot, multiplied by this multiplier." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 1, "Must be greater than or equal to 1") + .createWithDefault(5) + + val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog") + .internal() + .doc( + "This multiplier determines the minimum time threshold for logging warnings when a " + + "state store instance falls behind. The coordinator logs a warning if a state store's " + + "last snapshot upload time lags behind the current time by this threshold. " + + "The threshold is calculated as the maintenance interval multiplied by this multiplier." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 1, "Must be greater than or equal to 1") + .createWithDefault(10) + + val STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED = + buildConf("spark.sql.streaming.stateStore.coordinatorReportUpload.enabled") + .internal() + .doc( + "If enabled, state store instances will send a message to the state store " + Review Comment: nit: this doc is not really explaining the functionality but impl details. e.g. `When enabled, the state store coordinator will report state stores whose snapshot haven't been uploaded for some time. See other conf xyz that controls...`. Doesn't have to be my exact wording ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: never uploaded)" + } + logWarning(logMessage) + } + } else if (laggingStores.nonEmpty) { + logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full report was too recent") + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) => + val laggingStores = findLaggingStores(queryRunId, latestVersion, timestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + + def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = { + val versionDelta = latestVersion - version + val timeDelta = latestTimestamp - timestamp + + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG) + val maintenanceIntervalMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG) + val minDeltasForSnapshot = sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + val maintenanceInterval = sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + // Use the configured multipliers to determine the proper alert thresholds + val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * minDeltasForSnapshot + val minTimeDeltaForLogging = maintenanceIntervalMultiplier * maintenanceInterval + + // Mark a state store as lagging if it is behind in both version and time. + // In the case that a snapshot was never uploaded, we treat version -1 as the preceding + // version of 0, and only rely on the version delta condition. + // Time requirement will be automatically satisfied as the initial timestamp is 0. + versionDelta >= minVersionDeltaForLogging && timeDelta > minTimeDeltaForLogging + } + + override def compare(otherEvent: SnapshotUploadEvent): Int = { + // Compare by version first, then by timestamp as tiebreaker + val versionCompare = this.version.compare(otherEvent.version) + if (versionCompare == 0) { + this.timestamp.compare(otherEvent.timestamp) + } else { + versionCompare + } + } + + override def toString(): String = { + s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)" + } + } + + private def findLaggingStores( + queryRunId: UUID, + referenceVersion: Long, + referenceTimestamp: Long): Seq[StateStoreProviderId] = { + // Do not report any instance as lagging if the snapshot report upload is disabled, + // since it will treat all active instances as stores that have never uploaded. + if (!sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { Review Comment: why check this conf again? it is already checked before calling this func right? And this is a private func that is only called from one place in coordinator ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -55,6 +56,45 @@ private case class GetLocation(storeId: StateStoreProviderId) private case class DeactivateInstances(runId: UUID) extends StateStoreCoordinatorMessage +/** + * This message is used to report a state store instance has just finished uploading a snapshot, + * along with the timestamp in milliseconds and the snapshot version. + */ +private case class ReportSnapshotUploaded( + storeId: StateStoreProviderId, + version: Long, + timestamp: Long) + extends StateStoreCoordinatorMessage + +/** + * This message is used for the coordinator to look for all state stores that are lagging behind + * in snapshot uploads. The coordinator will then log a warning message for each lagging instance. + */ +private case class ConstructLaggingInstanceReport( Review Comment: nit: `LogLaggingStateStores` ? "Instance" is not applicable here since it means store instances i.e. a single store can have multiple instances across executors ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2256,6 +2256,59 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.minSnapshotDeltaMultiplierForMinVersionDeltaToLog") + .internal() + .doc( + "This multiplier determines the minimum version threshold for logging warnings when a " + + "state store instance falls behind. The coordinator logs a warning if a state store's " + + "last uploaded snapshot's version lags behind the query's latest known version by " + + "this threshold. The threshold is calculated as the configured minimum number of deltas " + + "needed to create a snapshot, multiplied by this multiplier." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 1, "Must be greater than or equal to 1") + .createWithDefault(5) + + val STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.maintenanceMultiplierForMinTimeDeltaToLog") + .internal() + .doc( + "This multiplier determines the minimum time threshold for logging warnings when a " + + "state store instance falls behind. The coordinator logs a warning if a state store's " + + "last snapshot upload time lags behind the current time by this threshold. " + + "The threshold is calculated as the maintenance interval multiplied by this multiplier." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 1, "Must be greater than or equal to 1") + .createWithDefault(10) + + val STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED = + buildConf("spark.sql.streaming.stateStore.coordinatorReportUpload.enabled") + .internal() + .doc( + "If enabled, state store instances will send a message to the state store " + + "coordinator whenever they complete a snapshot upload." + ) + .version("4.1.0") + .booleanConf + .createWithDefault(false) + + val STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL = + buildConf("spark.sql.streaming.stateStore.snapshotLagReportInterval") + .internal() + .doc( + "The minimum amount of time between the state store coordinator's full report on " + Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala: ########## @@ -283,6 +287,14 @@ abstract class ProgressContext( progressReporter.lastNoExecutionProgressEventTime = triggerClock.getTimeMillis() progressReporter.updateProgress(newProgress) + // Ask the state store coordinator to look for any lagging instances and report them. + progressReporter.stateStoreCoordinator + .constructLaggingInstanceReport( + lastExecution.runId, + lastEpochId, Review Comment: fyi this is a bug. lastEpochId (aka batchId) != latest store version. latest store version = batchId + 1. You can confirm. Because when batch x commits, it produces store version x + 1. Your test is not catching this because you're doing >= 0 verification in your test. Also because you don't have a test for when changelog is disabled. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -38,6 +38,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.Platform import org.apache.spark.util.{NonFateSharingCache, Utils} + Review Comment: nit: why? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -1472,6 +1475,8 @@ class RocksDB( log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}") // Compare and update with the version that was just uploaded. lastUploadedSnapshotVersion.updateAndGet(v => Math.max(snapshot.version, v)) + // Report snapshot upload event to the coordinator. + eventListener.foreach(_.reportSnapshotUploaded(snapshot.version)) Review Comment: So we are reporting this even if changelog checkpointing is not being used? Is there a need for this if we are uploading snapshot for every batch (i.e. no changelog)? Or Is it just for simplicity of the coordinator side logic for detection, since this is only specific to rocksdb? Just asking. I'm fine with just adding comment that it can be improved, if we are doing this for now, just to not overly complicate things ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -55,6 +56,45 @@ private case class GetLocation(storeId: StateStoreProviderId) private case class DeactivateInstances(runId: UUID) extends StateStoreCoordinatorMessage +/** + * This message is used to report a state store instance has just finished uploading a snapshot, + * along with the timestamp in milliseconds and the snapshot version. + */ +private case class ReportSnapshotUploaded( + storeId: StateStoreProviderId, + version: Long, + timestamp: Long) + extends StateStoreCoordinatorMessage + +/** + * This message is used for the coordinator to look for all state stores that are lagging behind + * in snapshot uploads. The coordinator will then log a warning message for each lagging instance. + */ +private case class ConstructLaggingInstanceReport( + queryRunId: UUID, + latestVersion: Long, + timestamp: Long) + extends StateStoreCoordinatorMessage + +/** + * Message used for testing. + * This message is used to retrieve the latest snapshot version reported for upload from a + * specific state store instance. + */ +private case class GetLatestSnapshotVersionForTesting(storeId: StateStoreProviderId) + extends StateStoreCoordinatorMessage + +/** + * Message used for testing. + * This message is used to retrieve the all active state store instance falling behind in Review Comment: nit: remove "the" ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp Review Comment: why are you using `endOfBatchTimestamp`? Why not current timestamp? it is possible the event was sent after endOfBatch but before we called this function. Hence timeDelta will be -ve ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala: ########## @@ -97,6 +97,13 @@ class StateStoreConf( val enableStateStoreCheckpointIds = StatefulOperatorStateInfo.enableStateStoreCheckpointIds(sqlConf) + /** + * Whether to report snapshot uploaded messages from the internal RocksDB instance + * to the state store coordinator. + */ Review Comment: wrong comment. This conf isn't about internal rocksdb ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: never uploaded)" + } + logWarning(logMessage) + } + } else if (laggingStores.nonEmpty) { + logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full report was too recent") + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) => + val laggingStores = findLaggingStores(queryRunId, latestVersion, timestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + + def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = { + val versionDelta = latestVersion - version Review Comment: I think I mentioned this in my last review. For the default event, version is -1 right. Hence there is a bug here for that. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => Review Comment: Should we limit the number of stores we log here? To avoid a situation where a customer has many partitions e.g. 500 partitions and half of them are lagging. This would mean we would write 250 messages in the log here. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: never uploaded)" + } + logWarning(logMessage) + } + } else if (laggingStores.nonEmpty) { + logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full report was too recent") Review Comment: Do we need this log? We already logged a warning that x number of lags were found. So if we don't log the full report that implies time hasn't been reached. Hence no need for this additional one. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: never uploaded)" Review Comment: lets use this instead: `no upload for query run` ? Because it is not that it has never been uploaded. It could have been uploaded in a previous run, but nothing in this new run. So lets not say never. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala: ########## @@ -155,16 +156,327 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { StateStore.stop() } } + Review Comment: also add test for when changelog is disabled. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: never uploaded)" + } + logWarning(logMessage) + } + } else if (laggingStores.nonEmpty) { + logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full report was too recent") + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) => + val laggingStores = findLaggingStores(queryRunId, latestVersion, timestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + + def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = { + val versionDelta = latestVersion - version + val timeDelta = latestTimestamp - timestamp + + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG) + val maintenanceIntervalMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG) + val minDeltasForSnapshot = sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + val maintenanceInterval = sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + // Use the configured multipliers to determine the proper alert thresholds + val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * minDeltasForSnapshot + val minTimeDeltaForLogging = maintenanceIntervalMultiplier * maintenanceInterval + + // Mark a state store as lagging if it is behind in both version and time. + // In the case that a snapshot was never uploaded, we treat version -1 as the preceding + // version of 0, and only rely on the version delta condition. + // Time requirement will be automatically satisfied as the initial timestamp is 0. + versionDelta >= minVersionDeltaForLogging && timeDelta > minTimeDeltaForLogging + } + + override def compare(otherEvent: SnapshotUploadEvent): Int = { + // Compare by version first, then by timestamp as tiebreaker + val versionCompare = this.version.compare(otherEvent.version) + if (versionCompare == 0) { + this.timestamp.compare(otherEvent.timestamp) + } else { + versionCompare + } + } + + override def toString(): String = { + s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)" + } + } + + private def findLaggingStores( + queryRunId: UUID, + referenceVersion: Long, + referenceTimestamp: Long): Seq[StateStoreProviderId] = { + // Do not report any instance as lagging if the snapshot report upload is disabled, + // since it will treat all active instances as stores that have never uploaded. + if (!sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + return Seq.empty + } + // Look for instances that are lagging behind in snapshot uploads + instances.keys.filter { storeProviderId => Review Comment: This PR only reports upload to coordinator for rocksdb state store. Since you are not reporting for hdfs, this code will say all hdfs state stores are lagging. Is there a reason why you're not reporting for HDFS state store too? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala: ########## @@ -155,16 +156,327 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { StateStore.stop() } } + + test( + "SPARK-51358: Snapshot uploads in RocksDB are not reported if changelog " + + "checkpointing is disabled" + ) { + withCoordinatorAndSQLConf( Review Comment: fyi, your tests are not exercising the construct lag rpc code path because you are not reducing the `snapshotLagReportInterval` (default is 5 mins) in your test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org