ericm-db commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r1977928582
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -119,6 +129,25 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { rpcEndpointRef.askSync[Boolean](DeactivateInstances(runId)) } + /** Inform that an executor has uploaded a snapshot */ + private[sql] def snapshotUploaded( + storeProviderId: StateStoreProviderId, + version: Long, + timestamp: Long): Unit = { + rpcEndpointRef.askSync[Boolean](SnapshotUploaded(storeProviderId, version, timestamp)) + } + + /** Get the latest snapshot version uploaded for a state store */ Review Comment: Please add comments indicating that both of these endpoints are for tests only ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -55,6 +56,15 @@ private case class GetLocation(storeId: StateStoreProviderId) private case class DeactivateInstances(runId: UUID) extends StateStoreCoordinatorMessage +private case class SnapshotUploaded(storeId: StateStoreProviderId, version: Long, timestamp: Long) Review Comment: Indicate which RPCs are for tests here too ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -1467,6 +1475,11 @@ class RocksDB( log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " + log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}") lastUploadedSnapshotVersion.set(snapshot.version) + // Report to coordinator that the snapshot has been uploaded when + // changelog checkpointing is enabled, since that is when stores can lag behind. + if(enableChangelogCheckpointing) { + providerListener.foreach(_.reportSnapshotUploaded(snapshot.version)) Review Comment: Do we also need to do this for the `shouldForceSnapshot` case? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +204,85 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning(s"Number of state stores falling behind: ${laggingStores.size}") + laggingStores.foreach { storeProviderId => + val snapshotEvent = + stateStoreSnapshotVersions.getOrElse(storeProviderId, SnapshotUploadEvent(-1, 0)) + logWarning( + s"State store falling behind $storeProviderId " + + s"(current: $snapshotEvent, latest: $latestSnapshot)" + ) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => + s"StateStoreId(operatorId=${id.storeId.operatorId}, " + + s"partitionId=${id.storeId.partitionId}, " + + s"storeName=${id.storeId.storeName})" + ) + .mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + def isLagging(latest: SnapshotUploadEvent): Boolean = { + val versionDelta = latest.version - version + val timeDelta = latest.timestamp - timestamp + val minVersionDeltaForLogging = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG) + // Use 10 times the maintenance interval as the minimum time delta for logging + val minTimeDeltaForLogging = 10 * sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + versionDelta >= minVersionDeltaForLogging || + (version >= 0 && timeDelta > minTimeDeltaForLogging) Review Comment: nit: indent ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +204,85 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning(s"Number of state stores falling behind: ${laggingStores.size}") + laggingStores.foreach { storeProviderId => + val snapshotEvent = + stateStoreSnapshotVersions.getOrElse(storeProviderId, SnapshotUploadEvent(-1, 0)) + logWarning( + s"State store falling behind $storeProviderId " + + s"(current: $snapshotEvent, latest: $latestSnapshot)" Review Comment: Also - if this is the log line we have to look for, maybe you can add a prefix that is easy to grep for. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +204,85 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning(s"Number of state stores falling behind: ${laggingStores.size}") + laggingStores.foreach { storeProviderId => + val snapshotEvent = + stateStoreSnapshotVersions.getOrElse(storeProviderId, SnapshotUploadEvent(-1, 0)) + logWarning( + s"State store falling behind $storeProviderId " + + s"(current: $snapshotEvent, latest: $latestSnapshot)" + ) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => + s"StateStoreId(operatorId=${id.storeId.operatorId}, " + + s"partitionId=${id.storeId.partitionId}, " + + s"storeName=${id.storeId.storeName})" + ) + .mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + def isLagging(latest: SnapshotUploadEvent): Boolean = { + val versionDelta = latest.version - version + val timeDelta = latest.timestamp - timestamp + val minVersionDeltaForLogging = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG) + // Use 10 times the maintenance interval as the minimum time delta for logging + val minTimeDeltaForLogging = 10 * sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + versionDelta >= minVersionDeltaForLogging || + (version >= 0 && timeDelta > minTimeDeltaForLogging) + } + + override def compare(that: SnapshotUploadEvent): Int = { + this.version.compare(that.version) + } + + override def toString(): String = { + s"SnapshotUploadEvent(version=$version, timestamp=$timestamp)" + } + } + + private def findLaggingStores(): (Seq[StateStoreProviderId], SnapshotUploadEvent) = { + // Find the most updated instance to use as reference point + val latestSnapshot = instances + .map( + instance => stateStoreSnapshotVersions.getOrElse(instance._1, SnapshotUploadEvent(-1, 0)) + ) + .max Review Comment: nit: move to line above ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala: ########## @@ -102,7 +103,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { test("multiple references have same underlying coordinator") { withCoordinatorRef(sc) { coordRef1 => - val coordRef2 = StateStoreCoordinatorRef.forDriver(sc.env) + val coordRef2 = StateStoreCoordinatorRef.forDriver(sc.env, new SQLConf) Review Comment: why coordref2? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +204,85 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning(s"Number of state stores falling behind: ${laggingStores.size}") + laggingStores.foreach { storeProviderId => + val snapshotEvent = + stateStoreSnapshotVersions.getOrElse(storeProviderId, SnapshotUploadEvent(-1, 0)) + logWarning( + s"State store falling behind $storeProviderId " + + s"(current: $snapshotEvent, latest: $latestSnapshot)" Review Comment: Maybe you can log the number of versions behind/time since last upload? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org