HeartSaVioR commented on code in PR #50195: URL: https://github.com/apache/spark/pull/50195#discussion_r1994940008
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -1465,6 +1470,7 @@ class RocksDB( log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " + log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " + log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}") + lastUploadedSnapshotVersion.set(snapshot.version) Review Comment: I guess there shouldn't be concurrent maintenance tasks to run for the same state store provider ID. That said, we shouldn't allow multiple state stores to create multiple RocksDB instances. But of course, it'd be better to do so to simply ensure that we never go back. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2233,6 +2233,19 @@ object SQLConf { .intConf .createWithDefault(10) + val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT = + buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport") + .internal() + .doc( + "Number of state store instance metrics included in streaming query progress messages " + + "per stateful operator. Instance metrics are selected based on metric-specific ordering " + + "to minimize noise in the progress report." + ) + .version("4.0.0") Review Comment: Let's punt this out to 4.1.0. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -203,20 +203,46 @@ trait StateStoreWriter def operatorStateMetadataVersion: Int = 1 - override lazy val metrics = statefulOperatorCustomMetrics ++ Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext, - "number of rows which are dropped by watermark"), - "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), - "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), - "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), - "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of removed state rows"), - "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"), - "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"), - "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"), - "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext, - "number of state store instances") - ) ++ stateStoreCustomMetrics ++ pythonMetrics + override lazy val metrics = { + // Lazy initialize instance metrics, but do not include these with regular metrics Review Comment: So this is mostly the only diff from the reverted commit, do I understand correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org