HeartSaVioR commented on code in PR #50195:
URL: https://github.com/apache/spark/pull/50195#discussion_r1994940008


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1465,6 +1470,7 @@ class RocksDB(
         log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
         log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " +
         log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
+      lastUploadedSnapshotVersion.set(snapshot.version)

Review Comment:
   I guess there shouldn't be concurrent maintenance tasks to run for the same 
state store provider ID.
   
   That said, we shouldn't allow multiple state stores to create multiple 
RocksDB instances.
   
   But of course, it'd be better to do so to simply ensure that we never go 
back.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2233,6 +2233,19 @@ object SQLConf {
       .intConf
       .createWithDefault(10)
 
+  val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+    
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+      .internal()
+      .doc(
+        "Number of state store instance metrics included in streaming query 
progress messages " +
+        "per stateful operator. Instance metrics are selected based on 
metric-specific ordering " +
+        "to minimize noise in the progress report."
+      )
+      .version("4.0.0")

Review Comment:
   Let's punt this out to 4.1.0.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -203,20 +203,46 @@ trait StateStoreWriter
 
   def operatorStateMetadataVersion: Int = 1
 
-  override lazy val metrics = statefulOperatorCustomMetrics ++ Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-    "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
-      "number of rows which are dropped by watermark"),
-    "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
total state rows"),
-    "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
updated state rows"),
-    "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
update"),
-    "numRemovedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of 
removed state rows"),
-    "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time 
to remove"),
-    "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
commit changes"),
-    "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by 
state"),
-    "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
-      "number of state store instances")
-  ) ++ stateStoreCustomMetrics ++ pythonMetrics
+  override lazy val metrics = {
+    // Lazy initialize instance metrics, but do not include these with regular 
metrics

Review Comment:
   So this is mostly the only diff from the reverted commit, do I understand 
correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to