micheal-o commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r1990598816
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -38,9 +38,20 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.Platform import org.apache.spark.util.{NonFateSharingCache, Utils} +/** + * Trait representing events reported from a RocksDB instance. + * + * The internal RocksDB instance can use a provider with a `RocksDBEventListener` reference to Review Comment: > The internal RocksDB instance can use a provider with a `RocksDBEventListener` reference This statement is a bit confusing. Should we say just "We pass this into the internal RocksDB instance to report specific events...". ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -65,6 +65,7 @@ case object StoreTaskCompletionListener extends RocksDBOpType("store_task_comple * @param localRootDir Root directory in local disk that is used to working and checkpointing dirs * @param hadoopConf Hadoop configuration for talking to the remote file system * @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs + * @param providerListener A reference to the state store provider for event callback reporting Review Comment: nit: confusing comment? I mean the "A reference to the state store provider" part ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -119,6 +139,31 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { rpcEndpointRef.askSync[Boolean](DeactivateInstances(runId)) } + /** Inform that an executor has uploaded a snapshot */ + private[sql] def snapshotUploaded( + storeProviderId: StateStoreProviderId, + version: Long, + timestamp: Long): Unit = { + rpcEndpointRef.askSync[Boolean](SnapshotUploaded(storeProviderId, version, timestamp)) + } + + /** + * Endpoint used for testing. + * Get the latest snapshot version uploaded for a state store. + */ + private[sql] def getLatestSnapshotVersion( Review Comment: Add ForTesting suffix? Also should be `private[state]` since it is only for testing? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -66,9 +86,9 @@ object StateStoreCoordinatorRef extends Logging { /** * Create a reference to a [[StateStoreCoordinator]] */ - def forDriver(env: SparkEnv): StateStoreCoordinatorRef = synchronized { + def forDriver(env: SparkEnv, conf: SQLConf): StateStoreCoordinatorRef = synchronized { try { - val coordinator = new StateStoreCoordinator(env.rpcEnv) + val coordinator = new StateStoreCoordinator(env.rpcEnv, conf) Review Comment: you are passing in current session conf, but if i remember correctly we only have one coordinator across all sessions. I don't think we create a coordinator per session. Can you confirm? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") Review Comment: nit: better to say: snapshot version 'foo' was uploaded for provider 'bar' ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " + + log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, latestSnapshot)})" + ) + laggingStores.foreach { storeProviderId => + val logMessage = stateStoreSnapshotVersions.get(storeProviderId) match { + case Some(snapshotEvent) => + val versionDelta = latestSnapshot.version - snapshotEvent.version + val timeDelta = latestSnapshot.timestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + Review Comment: if changelog checkpointing is off, hence no snapshot upload report, it seems this will report them as lagging? That is incorrect. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -55,6 +56,25 @@ private case class GetLocation(storeId: StateStoreProviderId) private case class DeactivateInstances(runId: UUID) extends StateStoreCoordinatorMessage +private case class SnapshotUploaded(storeId: StateStoreProviderId, version: Long, timestamp: Long) Review Comment: Also rename to `ReportSnapshotUploaded`? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -55,6 +56,25 @@ private case class GetLocation(storeId: StateStoreProviderId) private case class DeactivateInstances(runId: UUID) extends StateStoreCoordinatorMessage +private case class SnapshotUploaded(storeId: StateStoreProviderId, version: Long, timestamp: Long) + extends StateStoreCoordinatorMessage + +/** + * Message used for testing. + * This message is used to retrieve the latest snapshot version reported for upload from a + * specific state store instance. + */ +private case class GetLatestSnapshotVersion(storeId: StateStoreProviderId) + extends StateStoreCoordinatorMessage + +/** + * Message used for testing. + * This message is used to retrieve the all active state store instance falling behind in + * snapshot uploads, whether it is through version or time criteria. + */ +private case class GetLaggingStores() Review Comment: nit: add `ForTesting` suffix to method name, so it is super obvious it is only for testing ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -38,9 +38,20 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.Platform import org.apache.spark.util.{NonFateSharingCache, Utils} +/** + * Trait representing events reported from a RocksDB instance. + * + * The internal RocksDB instance can use a provider with a `RocksDBEventListener` reference to + * report specific events like snapshot uploads. This should only be used to report back to the + * coordinator for metrics and monitoring purposes. + */ +trait RocksDBEventListener { + def reportSnapshotUploaded(version: Long): Unit +} + private[sql] class RocksDBStateStoreProvider extends StateStoreProvider with Logging with Closeable - with SupportsFineGrainedReplay { + with SupportsFineGrainedReplay with RocksDBEventListener { Review Comment: I would rather implement a separate class instead of adding this to the provider. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -129,10 +174,17 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * and get their locations for job scheduling. */ -private class StateStoreCoordinator(override val rpcEnv: RpcEnv) - extends ThreadSafeRpcEndpoint with Logging { +private class StateStoreCoordinator( + override val rpcEnv: RpcEnv, + val sqlConf: SQLConf) + extends ThreadSafeRpcEndpoint + with Logging { private val instances = new mutable.HashMap[StateStoreProviderId, ExecutorCacheTaskLocation] + // Stores the latest snapshot version of a specific state store provider instance + private val stateStoreSnapshotVersions = Review Comment: nit: should this be `stateStoreLatestUploadedSnapshotVersion` or something more clear? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -55,6 +56,25 @@ private case class GetLocation(storeId: StateStoreProviderId) private case class DeactivateInstances(runId: UUID) extends StateStoreCoordinatorMessage +private case class SnapshotUploaded(storeId: StateStoreProviderId, version: Long, timestamp: Long) Review Comment: nit: add comment ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -1033,6 +1033,12 @@ object StateStore extends Logging { } } + def reportSnapshotUploaded(storeProviderId: StateStoreProviderId, snapshotVersion: Long): Unit = { Review Comment: nit: make private and only accessible within state package? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2236,6 +2236,19 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG = + buildConf("spark.sql.streaming.stateStore.minSnapshotVersionDeltaToLog") + .internal() + .doc( + "Minimum number of versions between the most recent uploaded snapshot version of a " + + "single state store instance and the most recent version across all state store " + + "instances to log a warning message." + ) + .version("4.0.0") + .intConf + .checkValue(k => k >= 0, "Must be greater than or equal to 0") Review Comment: what happens if equal to 0? Should we be able to turn this feature off? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -119,6 +139,31 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { rpcEndpointRef.askSync[Boolean](DeactivateInstances(runId)) } + /** Inform that an executor has uploaded a snapshot */ + private[sql] def snapshotUploaded( + storeProviderId: StateStoreProviderId, + version: Long, + timestamp: Long): Unit = { + rpcEndpointRef.askSync[Boolean](SnapshotUploaded(storeProviderId, version, timestamp)) + } + + /** + * Endpoint used for testing. + * Get the latest snapshot version uploaded for a state store. + */ + private[sql] def getLatestSnapshotVersion( + stateStoreProviderId: StateStoreProviderId): Option[Long] = { + rpcEndpointRef.askSync[Option[Long]](GetLatestSnapshotVersion(stateStoreProviderId)) + } + + /** + * Endpoint used for testing. + * Get the state store instances that are falling behind in snapshot uploads. + */ + private[sql] def getLaggingStores(): Seq[StateStoreProviderId] = { Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) Review Comment: I think we should only update if it is a higher version than the current recorded version. To avoid a situation where provider was moved to another executor but previous executor maintenance thread uploaded a old version after new executor uploaded new version. Chances of this happening is low but not zero, so lets handle and add comment. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " + + log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, latestSnapshot)})" + ) + laggingStores.foreach { storeProviderId => + val logMessage = stateStoreSnapshotVersions.get(storeProviderId) match { + case Some(snapshotEvent) => + val versionDelta = latestSnapshot.version - snapshotEvent.version + val timeDelta = latestSnapshot.timestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never uploaded)" + } + logWarning(logMessage) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => Review Comment: id.storeId already implements `toString` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " + + log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, latestSnapshot)})" + ) + laggingStores.foreach { storeProviderId => + val logMessage = stateStoreSnapshotVersions.get(storeProviderId) match { + case Some(snapshotEvent) => + val versionDelta = latestSnapshot.version - snapshotEvent.version + val timeDelta = latestSnapshot.timestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never uploaded)" + } + logWarning(logMessage) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => + s"StateStoreId(operatorId=${id.storeId.operatorId}, " + + s"partitionId=${id.storeId.partitionId}, " + + s"storeName=${id.storeId.storeName})" + ) + .mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + def isLagging(latest: SnapshotUploadEvent): Boolean = { + val versionDelta = latest.version - version + val timeDelta = latest.timestamp - timestamp + val minVersionDeltaForLogging = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG) + // Use 10 times the maintenance interval as the minimum time delta for logging + val minTimeDeltaForLogging = 10 * sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) Review Comment: why hard code 10? should this be a conf or something? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " + + log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, latestSnapshot)})" + ) + laggingStores.foreach { storeProviderId => + val logMessage = stateStoreSnapshotVersions.get(storeProviderId) match { + case Some(snapshotEvent) => + val versionDelta = latestSnapshot.version - snapshotEvent.version + val timeDelta = latestSnapshot.timestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never uploaded)" + } + logWarning(logMessage) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => + s"StateStoreId(operatorId=${id.storeId.operatorId}, " + + s"partitionId=${id.storeId.partitionId}, " + + s"storeName=${id.storeId.storeName})" + ) + .mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + def isLagging(latest: SnapshotUploadEvent): Boolean = { + val versionDelta = latest.version - version Review Comment: what if this is called with the version -1? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + Review Comment: nit: `StateStoreCoordinator Snapshot Lag Detected` ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() Review Comment: should we do this on an interval, instead of every time. e.g. in a situation where we have lets say 200 providers and 5 are lagging. Currently every time there is an upload for the 195 other providers, we will log. This may become noisy. Lets do it every x secs or mins to avoid noise? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " + + log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, latestSnapshot)})" + ) + laggingStores.foreach { storeProviderId => + val logMessage = stateStoreSnapshotVersions.get(storeProviderId) match { + case Some(snapshotEvent) => + val versionDelta = latestSnapshot.version - snapshotEvent.version + val timeDelta = latestSnapshot.timestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never uploaded)" + } + logWarning(logMessage) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => + s"StateStoreId(operatorId=${id.storeId.operatorId}, " + + s"partitionId=${id.storeId.partitionId}, " + + s"storeName=${id.storeId.storeName})" + ) + .mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + def isLagging(latest: SnapshotUploadEvent): Boolean = { + val versionDelta = latest.version - version + val timeDelta = latest.timestamp - timestamp + val minVersionDeltaForLogging = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG) + // Use 10 times the maintenance interval as the minimum time delta for logging + val minTimeDeltaForLogging = 10 * sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + versionDelta >= minVersionDeltaForLogging || + (version >= 0 && timeDelta > minTimeDeltaForLogging) + } + + override def compare(that: SnapshotUploadEvent): Int = { + this.version.compare(that.version) + } + + override def toString(): String = { + s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)" + } + } + + private def findLaggingStores(): (Seq[StateStoreProviderId], SnapshotUploadEvent) = { + if (instances.isEmpty) { + return (Seq.empty, SnapshotUploadEvent(-1, 0)) + } + // Find the most updated instance to use as reference point + val latestSnapshot = instances + .map( + instance => stateStoreSnapshotVersions.getOrElse(instance._1, SnapshotUploadEvent(-1, 0)) + ).max Review Comment: This is incorrect right. It is checking all providers, even providers that belongs to other queries. e.g. if query a has 100 batches but query b has only 2 batches, this will say query b providers are lagging, which isn't correct. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " + + log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, latestSnapshot)})" + ) + laggingStores.foreach { storeProviderId => + val logMessage = stateStoreSnapshotVersions.get(storeProviderId) match { + case Some(snapshotEvent) => + val versionDelta = latestSnapshot.version - snapshotEvent.version + val timeDelta = latestSnapshot.timestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never uploaded)" + } + logWarning(logMessage) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => + s"StateStoreId(operatorId=${id.storeId.operatorId}, " + + s"partitionId=${id.storeId.partitionId}, " + + s"storeName=${id.storeId.storeName})" + ) + .mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + def isLagging(latest: SnapshotUploadEvent): Boolean = { + val versionDelta = latest.version - version + val timeDelta = latest.timestamp - timestamp + val minVersionDeltaForLogging = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG) + // Use 10 times the maintenance interval as the minimum time delta for logging + val minTimeDeltaForLogging = 10 * sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + versionDelta >= minVersionDeltaForLogging || + (version >= 0 && timeDelta > minTimeDeltaForLogging) + } + + override def compare(that: SnapshotUploadEvent): Int = { + this.version.compare(that.version) + } + + override def toString(): String = { + s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)" + } + } + + private def findLaggingStores(): (Seq[StateStoreProviderId], SnapshotUploadEvent) = { + if (instances.isEmpty) { + return (Seq.empty, SnapshotUploadEvent(-1, 0)) Review Comment: nit: should we define `SnapshotUploadEvent(-1, 0)` somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org