micheal-o commented on code in PR #54298:
URL: https://github.com/apache/spark/pull/54298#discussion_r2819191260


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1438,29 +1442,34 @@ class RocksDB(
    * Delete all keys in the range [beginKey, endKey).
    * Uses RocksDB's native deleteRange for efficient bulk deletion.
    *
-   * @param beginKey The start key of the range (inclusive)
-   * @param endKey   The end key of the range (exclusive)
-   * @param cfName   The column family name
+   * @param beginKey       The start key of the range (inclusive)
+   * @param endKey         The end key of the range (exclusive)
+   * @param cfName         The column family name
+   * @param includesPrefix Whether the keys already include the column family 
prefix.
+   *                       Set to true during changelog replay to avoid 
double-encoding.
    */
   def deleteRange(
       beginKey: Array[Byte],
       endKey: Array[Byte],
-      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME,
+      includesPrefix: Boolean = false): Unit = {
     updateMemoryUsageIfNeeded()
 
-    val beginKeyWithPrefix = if (useColumnFamilies) {
+    val beginKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
       encodeStateRowWithPrefix(beginKey, cfName)
     } else {
       beginKey
     }
 
-    val endKeyWithPrefix = if (useColumnFamilies) {
+    val endKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
       encodeStateRowWithPrefix(endKey, cfName)
     } else {
       endKey
     }
 
     db.deleteRange(writeOptions, beginKeyWithPrefix, endKeyWithPrefix)
+    changelogWriter.foreach(_.deleteRange(beginKeyWithPrefix, 
endKeyWithPrefix))

Review Comment:
   as mentioned above, add checksum before writing to changelog



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -981,6 +981,10 @@ class RocksDB(
               verifyChangelogRecord(kvVerifier, key, Some(value))
               merge(key, value, includesPrefix = useColumnFamilies,
                 deriveCfName = useColumnFamilies, includesChecksum = 
conf.rowChecksumEnabled)
+
+            case RecordType.DELETE_RANGE_RECORD =>
+              // For deleteRange, 'key' is beginKey and 'value' is endKey

Review Comment:
   Lets add checksum in `deleteRange` otherwise your rocksdb changedatareader 
code below would fail when checksum is enabled. Our test with catch it too. 
Just add checksum when enabled, just like we do for `put`. Actually simple to 
add, see: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala#L1222



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1476,12 +1476,18 @@ class RocksDBStateStoreChangeDataReader(
       }
     }
 
-    val keyRow = currEncoder._1.decodeKey(currRecord._2)
-    if (currRecord._3 == null) {
-      (currRecord._1, keyRow, null, currentChangelogVersion - 1)
+    if (currRecord._1 == RecordType.DELETE_RANGE_RECORD) {
+      // For delete_range entries, the key and value have different schemas so 
we cannot
+      // put endKey into the value field. Leave both key and value as null.
+      (currRecord._1, null, null, currentChangelogVersion - 1)

Review Comment:
   This will lead to incorrect result for state data source cdf reader. That 
means, we will not tell the user that, that range of keys has been deleted. 
Please lets find a better way to handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to