ericm-db commented on code in PR #49816: URL: https://github.com/apache/spark/pull/49816#discussion_r1949568167
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ########## @@ -527,8 +528,16 @@ case class StreamingSymmetricHashJoinExec( (leftSideJoiner.numUpdatedStateRows + rightSideJoiner.numUpdatedStateRows) numTotalStateRows += combinedMetrics.numKeys stateMemory += combinedMetrics.memoryUsedBytes - combinedMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value + combinedMetrics.customMetrics.foreach { + // Set for custom partition metrics + case (metric: StateStoreCustomPartitionMetric, value) => + // Check for cases where value < 0 and .value converts metric to 0 Review Comment: Can you comment on why we are doing this (for uploaded version) ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -104,6 +105,10 @@ object StatefulOperatorStateInfo { * [[IncrementalExecution]]. */ trait StatefulOperator extends SparkPlan { + // Used to determine state store names used when allocation metric names, + // since join operations use multiple state stores with non-default names. + val isJoinOperator: Boolean = false Review Comment: Make this a function, not a val ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ########## @@ -527,8 +528,16 @@ case class StreamingSymmetricHashJoinExec( (leftSideJoiner.numUpdatedStateRows + rightSideJoiner.numUpdatedStateRows) numTotalStateRows += combinedMetrics.numKeys stateMemory += combinedMetrics.memoryUsedBytes - combinedMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value + combinedMetrics.customMetrics.foreach { + // Set for custom partition metrics + case (metric: StateStoreCustomPartitionMetric, value) => + // Check for cases where value < 0 and .value converts metric to 0 + longMetric(metric.name).set( + if (longMetric(metric.name).isZero) value + else Math.max(value, longMetric(metric.name).value) Review Comment: nit: use brackets ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala: ########## @@ -270,4 +287,232 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L)) assert(snapshotVersionsPresent(dirForPartition0).contains(5L)) } + + private def snapshotLagMetricName( + partitionId: Long, + storeName: String = StateStoreId.DEFAULT_STORE_NAME): String = Review Comment: nit: use brackets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org