zecookiez commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2014620141
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -164,13 +289,157 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) val storeIdsToRemove = instances.keys.filter(_.queryRunId == runId).toSeq instances --= storeIdsToRemove + // Also remove these instances from snapshot upload event tracking + stateStoreLatestUploadedSnapshot --= storeIdsToRemove + // Remove the corresponding run id entries for report time and starting time + lastFullSnapshotLagReportTimeMs -= runId + queryRunStartingPoint -= runId logDebug(s"Deactivating instances related to checkpoint location $runId: " + storeIdsToRemove.mkString(", ")) context.reply(true) + case ReportSnapshotUploaded(providerId, version, timestamp) => + // Ignore this upload event if the registered latest version for the store is more recent, + // since it's possible that an older version gets uploaded after a new executor uploads for + // the same state store but with a newer snapshot. + logDebug(s"Snapshot version $version was uploaded for state store $providerId") + if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version >= version)) { + stateStoreLatestUploadedSnapshot.put(providerId, SnapshotUploadEvent(version, timestamp)) + } + context.reply(true) + + case LogLaggingStateStores(queryRunId, latestVersion) => + val currentTimestamp = System.currentTimeMillis() + // Mark the query run's starting timestamp and latest version if the coordinator + // has never seen this query run before. + if (!queryRunStartingPoint.contains(queryRunId)) { + queryRunStartingPoint.put(queryRunId, QueryStartInfo(latestVersion, currentTimestamp)) + } else if (shouldCoordinatorReportSnapshotLag(queryRunId, latestVersion, currentTimestamp)) { + // Only log lagging instances if the snapshot report upload is enabled, + // otherwise all instances will be considered lagging. + val laggingStores = findLaggingStores(queryRunId, latestVersion, currentTimestamp) + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag Report for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)}" + ) + // Report all stores that are behind in snapshot uploads. + // Only report the list of providers lagging behind if the last reported time + // is not recent for this query run. The lag report interval denotes the minimum + // time between these full reports. + val timeSinceLastReport = + currentTimestamp - lastFullSnapshotLagReportTimeMs.getOrElse(queryRunId, 0L) + if (timeSinceLastReport > coordinatorLagReportInterval) { + // Mark timestamp of the report and log the lagging instances + lastFullSnapshotLagReportTimeMs.put(queryRunId, currentTimestamp) + // Only report the stores that are lagging the most behind in snapshot uploads. + laggingStores + .sortBy(stateStoreLatestUploadedSnapshot.getOrElse(_, defaultSnapshotUploadEvent)) + .take(sqlConf.stateStoreCoordinatorMaxLaggingStoresToReport) + .foreach { providerId => + val baseLogMessage = + log"StateStoreCoordinator Snapshot Lag Detected for " + + log"queryRunId=${MDC(LogKeys.QUERY_RUN_ID, queryRunId)} - " + + log"Store ID: ${MDC(LogKeys.STATE_STORE_ID, providerId.storeId)} " + + log"(Latest batch ID: ${MDC(LogKeys.BATCH_ID, latestVersion)}" + + val logMessage = stateStoreLatestUploadedSnapshot.get(providerId) match { + case Some(snapshotEvent) => + val versionDelta = latestVersion - Math.max(snapshotEvent.version, 0) Review Comment: 🤦 Oops yes good catch, that is not needed. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org