HeartSaVioR commented on code in PR #49304: URL: https://github.com/apache/spark/pull/49304#discussion_r1957574278
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -656,31 +803,75 @@ class RocksDB( * * @note This update is not committed to disk until commit() is called. */ - def merge(key: Array[Byte], value: Array[Byte]): Unit = { - if (conf.trackTotalNumberOfRows) { - val oldValue = db.get(readOptions, key) - if (oldValue == null) { - numKeysOnWritingVersion += 1 + def merge( + key: Array[Byte], + value: Array[Byte], + cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { + val keyWithPrefix = if (useColumnFamilies) { + encodeStateRowWithPrefix(key, cfName) + } else { + key + } + + if (useColumnFamilies) { + if (conf.trackTotalNumberOfRows) { + val oldValue = db.get(readOptions, keyWithPrefix) + if (oldValue == null) { + val cfInfo = getColumnFamilyInfo(cfName) + if (cfInfo.isInternal) { + numInternalKeysOnWritingVersion += 1 + } else { + numKeysOnWritingVersion += 1 + } + } + } + } else { + if (conf.trackTotalNumberOfRows) { + val oldValue = db.get(readOptions, keyWithPrefix) + if (oldValue == null) { + numKeysOnWritingVersion += 1 + } } } - db.merge(writeOptions, key, value) - changelogWriter.foreach(_.merge(key, value)) + db.merge(writeOptions, keyWithPrefix, value) + changelogWriter.foreach(_.merge(keyWithPrefix, value)) } /** * Remove the key if present. * @note This update is not committed to disk until commit() is called. */ - def remove(key: Array[Byte]): Unit = { - if (conf.trackTotalNumberOfRows) { - val value = db.get(readOptions, key) - if (value != null) { - numKeysOnWritingVersion -= 1 + def remove(key: Array[Byte], cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { + val keyWithPrefix = if (useColumnFamilies) { + encodeStateRowWithPrefix(key, cfName) + } else { + key + } + + if (useColumnFamilies) { + if (conf.trackTotalNumberOfRows) { + val oldValue = db.get(readOptions, keyWithPrefix) + if (oldValue != null) { + val cfInfo = getColumnFamilyInfo(cfName) + if (cfInfo.isInternal) { + numInternalKeysOnWritingVersion -= 1 + } else { + numKeysOnWritingVersion -= 1 + } + } + } + } else { + if (conf.trackTotalNumberOfRows) { + val value = db.get(readOptions, keyWithPrefix) + if (value != null) { + numKeysOnWritingVersion -= 1 + } } } - db.delete(writeOptions, key) - changelogWriter.foreach(_.delete(key)) + + db.delete(writeOptions, keyWithPrefix) + changelogWriter.foreach(_.delete(keyWithPrefix)) } /** Review Comment: I'm saying that UX isn't changed quite differently for all other methods except this. Most methods are changed to be aware of column family, while this isn't. For multiple column families case, we do not expect global scan for the entire key space, so this method is mostly unused. They are now having to use `prefixScan()` when they were using `iterator()`, which relies on implementation details how we construct virtual column family (and prefixScan wasn't intended to perform scan without actual prefix data). I'd rather say, it'd be better to have column family aware version of `iterator()`. I'd leave it to you to think which is better 1) having both `iterator()` and `iterator(cfName: String)`, and leave `iterator()` to perform full scan regardless of multiple column families, 2) only having `iterator(cfName: String)`, and whether to full scan or just delegate to `prefixScan(cfName)` based on internal flag (multiple column families). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org