zecookiez commented on code in PR #50572:
URL: https://github.com/apache/spark/pull/50572#discussion_r2049370318
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala:
##########
@@ -302,8 +302,16 @@ class SymmetricHashJoinStateManagerSuite extends
StreamTest with BeforeAndAfter
while (iter.hasNext) iter.next()
}
- def numRows(implicit manager: SymmetricHashJoinStateManager): Long = {
- manager.metrics.numKeys
+ def assertNumRows(stateFormatVersion: Int, target: Long)(
+ implicit manager: SymmetricHashJoinStateManager): Unit = {
+ // This suite originally uses HDFSBackStateStoreProvider, which provides
instantaneous metrics
+ // for numRows.
+ // But for version 3 with virtual column families,
RocksDBStateStoreProvider updates metrics
+ // asynchronously. This means the number of keys obtained from the metrics
are very likely
+ // to be outdated right after a put/remove.
+ if (stateFormatVersion <= 2) {
+ assert(manager.metrics.numKeys == target)
+ }
Review Comment:
We skip this check for v3, the test is still run for the new version and it
has some other assertions to go through like get/append/update/remove so the
test is not completely voided
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]