ericm-db commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1949568167


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -527,8 +528,16 @@ case class StreamingSymmetricHashJoinExec(
           (leftSideJoiner.numUpdatedStateRows + 
rightSideJoiner.numUpdatedStateRows)
         numTotalStateRows += combinedMetrics.numKeys
         stateMemory += combinedMetrics.memoryUsedBytes
-        combinedMetrics.customMetrics.foreach { case (metric, value) =>
-          longMetric(metric.name) += value
+        combinedMetrics.customMetrics.foreach {
+          // Set for custom partition metrics
+          case (metric: StateStoreCustomPartitionMetric, value) =>
+            // Check for cases where value < 0 and .value converts metric to 0

Review Comment:
   Can you comment on why we are doing this (for uploaded version)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -104,6 +105,10 @@ object StatefulOperatorStateInfo {
  * [[IncrementalExecution]].
  */
 trait StatefulOperator extends SparkPlan {
+  // Used to determine state store names used when allocation metric names,
+  // since join operations use multiple state stores with non-default names.
+  val isJoinOperator: Boolean = false

Review Comment:
   Make this a function, not a val



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -527,8 +528,16 @@ case class StreamingSymmetricHashJoinExec(
           (leftSideJoiner.numUpdatedStateRows + 
rightSideJoiner.numUpdatedStateRows)
         numTotalStateRows += combinedMetrics.numKeys
         stateMemory += combinedMetrics.memoryUsedBytes
-        combinedMetrics.customMetrics.foreach { case (metric, value) =>
-          longMetric(metric.name) += value
+        combinedMetrics.customMetrics.foreach {
+          // Set for custom partition metrics
+          case (metric: StateStoreCustomPartitionMetric, value) =>
+            // Check for cases where value < 0 and .value converts metric to 0
+            longMetric(metric.name).set(
+              if (longMetric(metric.name).isZero) value
+              else Math.max(value, longMetric(metric.name).value)

Review Comment:
   nit: use brackets



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +287,232 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
     assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
     assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
   }
+
+  private def snapshotLagMetricName(
+      partitionId: Long,
+      storeName: String = StateStoreId.DEFAULT_STORE_NAME): String =

Review Comment:
   nit: use brackets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to