ericm-db commented on code in PR #50195:
URL: https://github.com/apache/spark/pull/50195#discussion_r1989829264


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1465,6 +1470,7 @@ class RocksDB(
         log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
         log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " +
         log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
+      lastUploadedSnapshotVersion.set(snapshot.version)

Review Comment:
   Hm, do we need to do something like 
`lastUploadedSnapshotVersion.set(max(lastUploadedSnapshot, snapshot.version))`?
   Is there a case where this might be necessary?
   cc @HeartSaVioR 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -203,20 +203,40 @@ trait StateStoreWriter
 
   def operatorStateMetadataVersion: Int = 1
 
-  override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-    "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
-      "number of rows which are dropped by watermark"),
-    "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
total state rows"),
-    "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
updated state rows"),
-    "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
update"),
-    "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
removed state rows"),
-    "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to remove"),
-    "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
commit changes"),
-    "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by 
state"),
-    "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
-      "number of state store instances")
-  ) ++ stateStoreCustomMetrics ++ pythonMetrics
+  override lazy val metrics = {
+    // Lazy initialize instance metrics, but do not include these with regular 
metrics
+    instanceMetrics
+    statefulOperatorCustomMetrics ++ Map(
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "numRowsDroppedByWatermark" -> SQLMetrics
+        .createMetric(sparkContext, "number of rows which are dropped by 
watermark"),
+      "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
total state rows"),
+      "numUpdatedStateRows" -> SQLMetrics
+        .createMetric(sparkContext, "number of updated state rows"),
+      "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to update"),
+      "numRemovedStateRows" -> SQLMetrics
+        .createMetric(sparkContext, "number of removed state rows"),
+      "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to remove"),
+      "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
commit changes"),
+      "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used 
by state"),
+      "numStateStoreInstances" -> SQLMetrics
+        .createMetric(sparkContext, "number of state store instances")
+    ) ++ stateStoreCustomMetrics ++ pythonMetrics
+  }
+
+  lazy val instanceMetrics = stateStoreInstanceMetrics

Review Comment:
   Can you leave a comment about what this is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to