zecookiez commented on code in PR #50572:
URL: https://github.com/apache/spark/pull/50572#discussion_r2049370318


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala:
##########
@@ -302,8 +302,16 @@ class SymmetricHashJoinStateManagerSuite extends 
StreamTest with BeforeAndAfter
     while (iter.hasNext) iter.next()
   }
 
-  def numRows(implicit manager: SymmetricHashJoinStateManager): Long = {
-    manager.metrics.numKeys
+  def assertNumRows(stateFormatVersion: Int, target: Long)(
+    implicit manager: SymmetricHashJoinStateManager): Unit = {
+    // This suite originally uses HDFSBackStateStoreProvider, which provides 
instantaneous metrics
+    // for numRows.
+    // But for version 3 with virtual column families, 
RocksDBStateStoreProvider updates metrics
+    // asynchronously. This means the number of keys obtained from the metrics 
are very likely
+    // to be outdated right after a put/remove.
+    if (stateFormatVersion <= 2) {
+      assert(manager.metrics.numKeys == target)
+    }

Review Comment:
   We skip this check for v3, the test is still run for the new version and it 
has some other assertions to go through like get/append/update/remove so the 
test is not completely voided



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to