zecookiez commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r1992390682
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) storeIdsToRemove.mkString(", ")) context.reply(true) + case SnapshotUploaded(providerId, version, timestamp) => + stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version, timestamp)) + logDebug(s"Snapshot uploaded at ${providerId} with version ${version}") + // Report all stores that are behind in snapshot uploads + val (laggingStores, latestSnapshot) = findLaggingStores() + if (laggingStores.nonEmpty) { + logWarning( + log"StateStoreCoordinator Snapshot Lag - Number of state stores falling behind: " + + log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " + + log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT, latestSnapshot)})" + ) + laggingStores.foreach { storeProviderId => + val logMessage = stateStoreSnapshotVersions.get(storeProviderId) match { + case Some(snapshotEvent) => + val versionDelta = latestSnapshot.version - snapshotEvent.version + val timeDelta = latestSnapshot.timestamp - snapshotEvent.timestamp + + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " + + log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA, versionDelta)}, " + + log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA, timeDelta)}ms)" + case None => + log"StateStoreCoordinator Snapshot Lag - State store falling behind " + + log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} (never uploaded)" + } + logWarning(logMessage) + } + } + context.reply(true) + + case GetLatestSnapshotVersion(providerId) => + val version = stateStoreSnapshotVersions.get(providerId).map(_.version) + logDebug(s"Got latest snapshot version of the state store $providerId: $version") + context.reply(version) + + case GetLaggingStores => + val (laggingStores, _) = findLaggingStores() + logDebug(s"Got lagging state stores ${laggingStores + .map( + id => Review Comment: Gotcha, I saw that already contained a toString method but wasn't sure how verbose we wanted this to be. Switched it back to use the pre-existing one instead :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org