ericm-db commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2003726543
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala: ########## @@ -283,6 +284,9 @@ abstract class ProgressContext( progressReporter.lastNoExecutionProgressEventTime = triggerClock.getTimeMillis() progressReporter.updateProgress(newProgress) + // Ask the state store coordinator to look for any lagging instances and report them. + StateStore.constructLaggingInstanceReport(lastExecution.runId, lastEpochId) Review Comment: Is there any way you can call into the StateStoreCoordinator object so this is more clear? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,153 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, timestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (isSnapshotUploadReportEnabled) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, timestamp) Review Comment: Right now we're passing in the same timestamp for every single partition to see whether it's lagging or not. I'm not sure whether this makes sense since every partition could have had a different time it last uploaded. Would it be difficult to pass this information in per partition to see whether it's lagging or not? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,153 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, timestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (isSnapshotUploadReportEnabled) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, timestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + if (laggingStores.nonEmpty && + System.currentTimeMillis() - lastFullSnapshotLagReport > coordinatorLagReportInterval) { Review Comment: nit: Can we save System.currentTimeMillis() as a variable here as we use it a couple times here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -38,9 +38,20 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.Platform import org.apache.spark.util.{NonFateSharingCache, Utils} +/** + * Trait representing events reported from a RocksDB instance. + * + * The internal RocksDB instance can use a provider with a `RocksDBEventListener` reference to + * report specific events like snapshot uploads. This should only be used to report back to the + * coordinator for metrics and monitoring purposes. + */ +trait RocksDBEventListener { + def reportSnapshotUploaded(version: Long): Unit +} + private[sql] class RocksDBStateStoreProvider extends StateStoreProvider with Logging with Closeable - with SupportsFineGrainedReplay { + with SupportsFineGrainedReplay with RocksDBEventListener { Review Comment: @zecookiez would that be a big change to make? It should be fine, right? I'm okay with how this is currently, though ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -129,10 +210,25 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * and get their locations for job scheduling. */ -private class StateStoreCoordinator(override val rpcEnv: RpcEnv) - extends ThreadSafeRpcEndpoint with Logging { +private class StateStoreCoordinator( + override val rpcEnv: RpcEnv, + val sqlConf: SQLConf) + extends ThreadSafeRpcEndpoint + with Logging { private val instances = new mutable.HashMap[StateStoreProviderId, ExecutorCacheTaskLocation] + // Stores the latest snapshot upload event for a specific state store provider instance + private val stateStoreLatestUploadedSnapshot = Review Comment: Kind of a meta-question - do we populate this map only once we receive an RPC? What happens if we never receive an RPC for a particular partition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org