zecookiez commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2006510983
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +264,141 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the provider is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same provider but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for provider $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case ConstructLaggingInstanceReport(queryRunId, latestVersion, endOfBatchTimestamp) => + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + if (sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_REPORT_UPLOAD_ENABLED)) { + val laggingStores = findLaggingStores(queryRunId, latestVersion, endOfBatchTimestamp) + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the full list of providers lagging behind if the last reported time + // is not recent. The lag report interval denotes the minimum time between these + // full reports. + val coordinatorLagReportInterval = + sqlConf.getConf(SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL) + val currentTimestamp = System.currentTimeMillis() + if (laggingStores.nonEmpty && + currentTimestamp - lastFullSnapshotLagReport > coordinatorLagReportInterval) { + // Mark timestamp of the full report and log the lagging instances + lastFullSnapshotLagReport = currentTimestamp + laggingStores.foreach { providerId => + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - snapshotEvent.version + val timeDelta = endOfBatchTimestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, snapshotEvent)}, " + + log"version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Provider: ${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}, " + + log"latest snapshot: never uploaded)" + } + logWarning(logMessage) + } + } else if (laggingStores.nonEmpty) { + logInfo(log"StateStoreCoordinator Snapshot Lag Report - last full report was too recent") + } + } + context.reply(true) + + case GetLatestSnapshotVersionForTesting(providerId) => + val version = stateStoreLatestUploadedSnapshot.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStoresForTesting(queryRunId, latestVersion, timestamp) => + val laggingStores = findLaggingStores(queryRunId, latestVersion, timestamp) + logDebug(s"Got lagging state stores: ${laggingStores.mkString(", ")}") + context.reply(laggingStores) + case StopCoordinator => stop() // Stop before replying to ensure that endpoint name has been deregistered logInfo("StateStoreCoordinator stopped") context.reply(true) } + + case class SnapshotUploadEvent( + version: Long, + timestamp: Long + ) extends Ordered[SnapshotUploadEvent] { + + def isLagging(latestVersion: Long, latestTimestamp: Long): Boolean = { + val versionDelta = latestVersion - version + val timeDelta = latestTimestamp - timestamp + + // Determine alert thresholds from configurations for both time and version differences. + val snapshotVersionDeltaMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_DELTA_MULTIPLIER_FOR_MIN_VERSION_DELTA_TO_LOG) + val maintenanceIntervalMultiplier = sqlConf.getConf( + SQLConf.STATE_STORE_COORDINATOR_MAINTENANCE_MULTIPLIER_FOR_MIN_TIME_DELTA_TO_LOG) + val minDeltasForSnapshot = sqlConf.getConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + val maintenanceInterval = sqlConf.getConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL) + + // Use the configured multipliers to determine the proper alert thresholds + val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * minDeltasForSnapshot + val minTimeDeltaForLogging = maintenanceIntervalMultiplier * maintenanceInterval + + // Mark a state store as lagging if it is behind in both version and time. + // In the case that a snapshot was never uploaded, we treat version -1 as the preceding + // version of 0, and only rely on the version delta condition. + // Time requirement will be automatically satisfied as the initial timestamp is 0. Review Comment: Hm this is an interesting case. I'll add a new test for this, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org