ericm-db commented on code in PR #50195: URL: https://github.com/apache/spark/pull/50195#discussion_r1989829264
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -1465,6 +1470,7 @@ class RocksDB( log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " + log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " + log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}") + lastUploadedSnapshotVersion.set(snapshot.version) Review Comment: Hm, do we need to do something like `lastUploadedSnapshotVersion.set(max(lastUploadedSnapshot, snapshot.version))`? Is there a case where this might be necessary? cc @HeartSaVioR ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -203,20 +203,40 @@ trait StateStoreWriter def operatorStateMetadataVersion: Int = 1 - override lazy val metrics = statefulOperatorCustomMetrics ++ Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext, - "number of rows which are dropped by watermark"), - "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), - "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), - "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), - "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of removed state rows"), - "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"), - "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"), - "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"), - "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext, - "number of state store instances") - ) ++ stateStoreCustomMetrics ++ pythonMetrics + override lazy val metrics = { + // Lazy initialize instance metrics, but do not include these with regular metrics + instanceMetrics + statefulOperatorCustomMetrics ++ Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numRowsDroppedByWatermark" -> SQLMetrics + .createMetric(sparkContext, "number of rows which are dropped by watermark"), + "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), + "numUpdatedStateRows" -> SQLMetrics + .createMetric(sparkContext, "number of updated state rows"), + "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), + "numRemovedStateRows" -> SQLMetrics + .createMetric(sparkContext, "number of removed state rows"), + "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"), + "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"), + "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"), + "numStateStoreInstances" -> SQLMetrics + .createMetric(sparkContext, "number of state store instances") + ) ++ stateStoreCustomMetrics ++ pythonMetrics + } + + lazy val instanceMetrics = stateStoreInstanceMetrics Review Comment: Can you leave a comment about what this is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org