anishshri-db commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2022119019
########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2291,6 +2291,70 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG = + buildConf("spark.sql.streaming.stateStore.multiplierForMinVersionDiffToLog") + .internal() + .doc( + "Determines the version threshold for logging warnings when a state store falls behind. " + + "The coordinator logs a warning when the store's uploaded snapshot version trails the " + + "query's latest version by the configured number of deltas needed to create a snapshot, " + + "times this multiplier." + ) + .version("4.1.0") + .longConf + .checkValue(k => k >= 1L, "Must be greater than or equal to 1") + .createWithDefault(5L) + + val STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_TIME_DIFF_TO_LOG = + buildConf("spark.sql.streaming.stateStore.multiplierForMinTimeDiffToLog") + .internal() + .doc( + "Determines the time threshold for logging warnings when a state store falls behind. " + + "The coordinator logs a warning when the store's uploaded snapshot timestamp trails the " + + "current time by the configured maintenance interval, times this multiplier." + ) + .version("4.1.0") + .longConf + .checkValue(k => k >= 1L, "Must be greater than or equal to 1") + .createWithDefault(10L) + + val STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG = + buildConf("spark.sql.streaming.stateStore.coordinatorReportSnapshotUploadLag") + .internal() + .doc( + "When enabled, the state store coordinator will report state stores whose snapshot " + + "have not been uploaded for some time. See the conf snapshotLagReportInterval for " + + "the minimum time between reports, and the conf multiplierForMinVersionDiffToLog " + + "and multiplierForMinTimeDiffToLog for the logging thresholds." + ) + .version("4.1.0") + .booleanConf + .createWithDefault(true) + + val STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL = + buildConf("spark.sql.streaming.stateStore.snapshotLagReportInterval") + .internal() + .doc( + "The minimum amount of time between the state store coordinator's reports on " + + "state store instances trailing behind in snapshot uploads." + ) + .version("4.1.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(TimeUnit.MINUTES.toMillis(5)) + + val STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT = + buildConf("spark.sql.streaming.stateStore.maxLaggingStoresToReport") + .internal() + .doc( + "Maximum number of state stores the coordinator will report as trailing in " + + "snapshot uploads. Stores are selected based on the most lagging behind in " + + "snapshot version." + ) + .version("4.1.0") + .intConf + .checkValue(k => k >= 0, "Must be greater than or equal to 0") + .createWithDefault(10) Review Comment: 5 is probably enough ? or maybe some % of the total num of partitions ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org