micheal-o commented on code in PR #54298:
URL: https://github.com/apache/spark/pull/54298#discussion_r2819191260
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1438,29 +1442,34 @@ class RocksDB(
* Delete all keys in the range [beginKey, endKey).
* Uses RocksDB's native deleteRange for efficient bulk deletion.
*
- * @param beginKey The start key of the range (inclusive)
- * @param endKey The end key of the range (exclusive)
- * @param cfName The column family name
+ * @param beginKey The start key of the range (inclusive)
+ * @param endKey The end key of the range (exclusive)
+ * @param cfName The column family name
+ * @param includesPrefix Whether the keys already include the column family
prefix.
+ * Set to true during changelog replay to avoid
double-encoding.
*/
def deleteRange(
beginKey: Array[Byte],
endKey: Array[Byte],
- cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+ cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME,
+ includesPrefix: Boolean = false): Unit = {
updateMemoryUsageIfNeeded()
- val beginKeyWithPrefix = if (useColumnFamilies) {
+ val beginKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
encodeStateRowWithPrefix(beginKey, cfName)
} else {
beginKey
}
- val endKeyWithPrefix = if (useColumnFamilies) {
+ val endKeyWithPrefix = if (useColumnFamilies && !includesPrefix) {
encodeStateRowWithPrefix(endKey, cfName)
} else {
endKey
}
db.deleteRange(writeOptions, beginKeyWithPrefix, endKeyWithPrefix)
+ changelogWriter.foreach(_.deleteRange(beginKeyWithPrefix,
endKeyWithPrefix))
Review Comment:
as mentioned above, add checksum before writing to changelog
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -981,6 +981,10 @@ class RocksDB(
verifyChangelogRecord(kvVerifier, key, Some(value))
merge(key, value, includesPrefix = useColumnFamilies,
deriveCfName = useColumnFamilies, includesChecksum =
conf.rowChecksumEnabled)
+
+ case RecordType.DELETE_RANGE_RECORD =>
+ // For deleteRange, 'key' is beginKey and 'value' is endKey
Review Comment:
Lets add checksum in `deleteRange` otherwise your rocksdb changedatareader
code below would fail when checksum is enabled. Our test with catch it too.
Just add checksum when enabled, just like we do for `put`. Actually simple to
add, see:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala#L1222
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1476,12 +1476,18 @@ class RocksDBStateStoreChangeDataReader(
}
}
- val keyRow = currEncoder._1.decodeKey(currRecord._2)
- if (currRecord._3 == null) {
- (currRecord._1, keyRow, null, currentChangelogVersion - 1)
+ if (currRecord._1 == RecordType.DELETE_RANGE_RECORD) {
+ // For delete_range entries, the key and value have different schemas so
we cannot
+ // put endKey into the value field. Leave both key and value as null.
+ (currRecord._1, null, null, currentChangelogVersion - 1)
Review Comment:
This will lead to incorrect result for state data source cdf reader. That
means, we will not tell the user that, that range of keys has been deleted.
Please lets find a better way to handle this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]