zecookiez commented on code in PR #50572:
URL: https://github.com/apache/spark/pull/50572#discussion_r2049379277


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala:
##########
@@ -353,6 +353,81 @@ class StateStoreInstanceMetricSuite extends StreamTest 
with AlsoTestWithRocksDBF
       }
   }
 
+  testWithChangelogCheckpointingEnabled(
+    "SPARK-51779 Verify snapshot lag metrics are updated correctly for join " +
+    "using virtual column families with RocksDB"
+  ) {
+    withSQLConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+        classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
+      SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "4",
+      SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3"
+    ) {
+      withTempDir { checkpointDir =>
+        val input1 = MemoryStream[Int]
+        val input2 = MemoryStream[Int]
+
+        val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) 
as "leftValue")
+        val df2 = input2
+          .toDF()
+          .select($"value" as "rightKey", ($"value" * 3) as "rightValue")
+        val joined = df1.join(df2, expr("leftKey = rightKey"))
+
+        testStream(joined)(
+          StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+          AddData(input1, 1, 5),
+          ProcessAllAvailable(),
+          AddData(input2, 1, 5, 10),
+          ProcessAllAvailable(),
+          AddData(input1, 2, 3),
+          ProcessAllAvailable(),
+          CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)),
+          AddData(input1, 2),
+          ProcessAllAvailable(),
+          AddData(input2, 3),
+          ProcessAllAvailable(),
+          AddData(input1, 4),
+          ProcessAllAvailable(),
+          Execute { q =>
+            eventually(timeout(10.seconds)) {
+              // Make sure only smallest K active metrics are published.
+              // There are 5 metrics in total, but only 4 are published.
+              val allInstanceMetrics = q.lastProgress
+                .stateOperators(0)
+                .customMetrics
+                .asScala
+                .filter(_._1.startsWith(SNAPSHOT_LAG_METRIC_PREFIX))
+              val badInstanceMetrics = allInstanceMetrics.filter {
+                case (key, _) =>
+                  key.startsWith(snapshotLagMetricName(0, "")) ||
+                    key.startsWith(snapshotLagMetricName(1, ""))
+              }
+              // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT
+              assert(
+                allInstanceMetrics.size == q.sparkSession.conf
+                  .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
+              )
+              // Two ids are blocked, making two lagging stores
+              // However, creating a family column forces a snapshot 
regardless of maintenance
+              // Thus, the version will be 1 for this case.
+              assert(badInstanceMetrics.count(_._2 == 1) == 2)

Review Comment:
   I'll add an additional comment for this, but if it were multiple metrics per 
partition like in the v2 join the check would be `== Math.min(reportLimit, 2 * 
4)` instead of `== 2`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to