HeartSaVioR commented on code in PR #49816: URL: https://github.com/apache/spark/pull/49816#discussion_r1952059417
########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2251,6 +2251,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT = + buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport") + .internal() + .doc( + "Number of state store instance metrics to include in streaming progress reporting." + Review Comment: The doc is a bit confusing, because we are not clarifying that this is per stateful operator, not from entire plan. There are now non-trivial number of streaming queries having more than one stateful operators. Shall we make it clear? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -147,6 +147,10 @@ class RocksDB( private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing @volatile protected var loadedVersion: Long = -1L // -1 = nothing valid is loaded + // Can be updated by whichever thread uploaded, which could be either task, maintenance, or both. + // -1 represents no version has ever been uploaded. + protected val lastUploadedVersion: AtomicLong = new AtomicLong(-1L) Review Comment: nit: shall we clarify "snapshot" here? We also upload changelog. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2251,6 +2251,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT = + buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport") + .internal() + .doc( + "Number of state store instance metrics to include in streaming progress reporting." + + "This is used to reduce noise in the progress report." + ) + .version("4.0.0") + .intConf + .checkValue(k => k >= 0, "Must be greater than or equal to 0") + .createWithDefault(10) Review Comment: 10 sounds to me too noisy for most of "healthy" workloads. I wanted to make this be also controlled by threshold, but since many folks are OK with this approach, let's just reduce the number a bit conservatively, like 5. People can adjust this to higher number when they indicate the problem. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -321,6 +321,38 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat SQLMetrics.createTimingMetric(sparkContext, desc) } +case class StateStoreInstanceMetric( + metricPrefix: String, + descPrefix: String, + partitionId: Option[Int] = None, + storeName: String = StateStoreId.DEFAULT_STORE_NAME) + extends StateStoreCustomMetric { Review Comment: nit: this is bound to 2 spaces rather than 4 spaces ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -1688,7 +1694,8 @@ case class RocksDBMetrics( bytesCopied: Long, filesReused: Long, zipFileBytesUncompressed: Option[Long], - nativeOpsMetrics: Map[String, Long]) { + nativeOpsMetrics: Map[String, Long], + lastUploadedVersion: Long) { Review Comment: same here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -391,13 +402,48 @@ trait StateStoreWriter } } + protected def setStoreCustomMetrics(storeMetrics: StateStoreMetrics): Unit = { + storeMetrics.customMetrics.foreach { + // Set the max for instance metrics + case (metric: StateStoreInstanceMetric, value) => + // Check for cases where value < 0 and .value converts metric to 0 + // Metrics like last uploaded snapshot version can have an init value of -1, + // which need special handling to avoid setting the metric to 0 using `.value`. + longMetric(metric.name).set( + if (longMetric(metric.name).isZero) { + value + } else { + // Use max to grab the most updated value across all state store instances, Review Comment: Will this semantic work for every possible custom metrics we can think of? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2251,6 +2251,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT = + buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport") + .internal() + .doc( + "Number of state store instance metrics to include in streaming progress reporting." + Review Comment: Also let's clarify that it is working as top N. If you want to leave the definition of criteria to each metrics, that's fine and we'd like to mention it explicitly. (e.g. the criteria of choosing N instances is dependent on each metric) ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -321,6 +321,38 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat SQLMetrics.createTimingMetric(sparkContext, desc) } +case class StateStoreInstanceMetric( + metricPrefix: String, + descPrefix: String, + partitionId: Option[Int] = None, Review Comment: Is it valid metric if you don't have partitionId? Looks like when you don't have partitionId, you also ignore storeName. If you just intend to have "uninitialized" state, let's assert proactively rather than let the uninitialized state accidently be used. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -320,11 +322,22 @@ trait StateStoreWriter * the driver after this SparkPlan has been executed and metrics have been updated. */ def getProgress(): StateOperatorProgress = { + // Still publish the instance metrics that are marked as unpopulated, + // as they represent state store instances that never uploaded a snapshot. + val instanceMetrics = stateStoreInstanceMetrics.map { entry => + entry._1 -> (if (longMetric(entry._1).isZero) -1L else longMetric(entry._1).value) + } + // Only keep the smallest N instance metrics to report to driver. Review Comment: This is not scalable. For the last uploaded snapshot, the top N is the smallest N because smaller value = bigger lag. But what if we want to have the metric for memory usage and want to produce the top N excessive memory usage? Whenever we deal with interface, we need to consider the interface as "general" one and think through possible derivations. Since we only allow a numeric value for that metric, probably describing sort order in the interface would be fine. (Otherwise we may have to enforce implementing Comparable for the derivation of the interface.) ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2251,6 +2251,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT = + buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport") + .internal() + .doc( + "Number of state store instance metrics to include in streaming progress reporting." + + "This is used to reduce noise in the progress report." + ) + .version("4.0.0") + .intConf + .checkValue(k => k >= 0, "Must be greater than or equal to 0") + .createWithDefault(10) Review Comment: It's not the one way door, so we will be able to improve this later. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -320,11 +322,22 @@ trait StateStoreWriter * the driver after this SparkPlan has been executed and metrics have been updated. */ def getProgress(): StateOperatorProgress = { + // Still publish the instance metrics that are marked as unpopulated, + // as they represent state store instances that never uploaded a snapshot. + val instanceMetrics = stateStoreInstanceMetrics.map { entry => + entry._1 -> (if (longMetric(entry._1).isZero) -1L else longMetric(entry._1).value) + } + // Only keep the smallest N instance metrics to report to driver. Review Comment: Once we reflect this, we'd like to generalize the comment as well e.g. pick N with specified order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org