liviazhu commented on code in PR #54298:
URL: https://github.com/apache/spark/pull/54298#discussion_r2823757200
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -2547,6 +2547,82 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ test("deleteRange - changelog checkpointing records and replays range
deletions") {
+ // useColumnFamilies = true is required to get changelog writer V2 which
supports
+ // DELETE_RANGE_RECORD. V1 (used when useColumnFamilies = false) does not
support it.
+ withSQLConf(
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "100") {
+ val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+ val keyEncoderSpec =
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0))
+ val cfName = "testColFamily"
+
+ // Create provider and commit version 1 with some data and a deleteRange
+ tryWithProviderResource(
+ newStoreProvider(storeId, keyEncoderSpec,
+ keySchema = keySchemaWithRangeScan,
+ useColumnFamilies = true)) { provider =>
+ val store = provider.getStore(0)
+ store.createColFamilyIfAbsent(cfName,
+ keySchemaWithRangeScan, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+
+ // Put keys: (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")
+ store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10),
cfName)
+ store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20),
cfName)
+ store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30),
cfName)
+ store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40),
cfName)
+ store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50),
cfName)
+ store.commit()
+
+ // Version 2: deleteRange [2, 4) - should delete keys 2 and 3
+ val store2 = provider.getStore(1)
+ store2.createColFamilyIfAbsent(cfName,
+ keySchemaWithRangeScan, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+ val beginKey = dataToKeyRowWithRangeScan(2L, "")
+ val endKey = dataToKeyRowWithRangeScan(4L, "")
+ store2.deleteRange(beginKey, endKey, cfName)
+ store2.commit()
+ }
+
+ // Reload from a fresh provider (same storeId) to force changelog replay
+ tryWithProviderResource(
+ newStoreProvider(storeId, keyEncoderSpec,
+ keySchema = keySchemaWithRangeScan,
+ useColumnFamilies = true)) { reloadedProvider =>
+ val reloadedStore = reloadedProvider.getStore(2)
+ try {
+ reloadedStore.createColFamilyIfAbsent(cfName,
+ keySchemaWithRangeScan, valueSchema,
+ RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+ val remainingKeys = reloadedStore.iterator(cfName).map { kv =>
+ keyRowWithRangeScanToData(kv.key)
+ }.toSeq
+
+ // Keys 1, 4, 5 should remain; keys 2, 3 should have been deleted
via replay
+ assert(remainingKeys.length === 3)
+ assert(remainingKeys.map(_._1).toSet === Set(1L, 4L, 5L))
+ } finally {
+ if (!reloadedStore.hasCommitted) reloadedStore.abort()
+ }
+
+ // Verify that the change data reader returns DELETE_RANGE_RECORD with
beginKey,
+ // null value, and endKey in the dedicated end_key field.
+ val reader = reloadedProvider.asInstanceOf[SupportsFineGrainedReplay]
Review Comment:
nit: can we move this test to StateDataSourceChangeDataReadSuite.scala‎?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]