HeartSaVioR commented on code in PR #49304:
URL: https://github.com/apache/spark/pull/49304#discussion_r1957574278


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -656,31 +803,75 @@ class RocksDB(
    *
    * @note This update is not committed to disk until commit() is called.
    */
-  def merge(key: Array[Byte], value: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(readOptions, key)
-      if (oldValue == null) {
-        numKeysOnWritingVersion += 1
+  def merge(
+      key: Array[Byte],
+      value: Array[Byte],
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    if (useColumnFamilies) {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue == null) {
+          val cfInfo = getColumnFamilyInfo(cfName)
+          if (cfInfo.isInternal) {
+            numInternalKeysOnWritingVersion += 1
+          } else {
+            numKeysOnWritingVersion += 1
+          }
+        }
+      }
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }
       }
     }
-    db.merge(writeOptions, key, value)
 
-    changelogWriter.foreach(_.merge(key, value))
+    db.merge(writeOptions, keyWithPrefix, value)
+    changelogWriter.foreach(_.merge(keyWithPrefix, value))
   }
 
   /**
    * Remove the key if present.
    * @note This update is not committed to disk until commit() is called.
    */
-  def remove(key: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val value = db.get(readOptions, key)
-      if (value != null) {
-        numKeysOnWritingVersion -= 1
+  def remove(key: Array[Byte], cfName: String = 
StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    if (useColumnFamilies) {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue != null) {
+          val cfInfo = getColumnFamilyInfo(cfName)
+          if (cfInfo.isInternal) {
+            numInternalKeysOnWritingVersion -= 1
+          } else {
+            numKeysOnWritingVersion -= 1
+          }
+        }
+      }
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val value = db.get(readOptions, keyWithPrefix)
+        if (value != null) {
+          numKeysOnWritingVersion -= 1
+        }
       }
     }
-    db.delete(writeOptions, key)
-    changelogWriter.foreach(_.delete(key))
+
+    db.delete(writeOptions, keyWithPrefix)
+    changelogWriter.foreach(_.delete(keyWithPrefix))
   }
 
   /**

Review Comment:
   I'm saying that UX isn't changed quite differently for all other methods 
except this. Most methods are changed to be aware of column family, while this 
isn't.
   
   For multiple column families case, we do not expect global scan for the 
entire key space, so this method is mostly unused. They are now having to use 
`prefixScan()` when they were using `iterator()`, which relies on 
implementation details how we construct virtual column family (and prefixScan 
wasn't intended to perform scan without actual prefix data). I'd rather say, 
it'd be better to have column family aware version of `iterator()`. 
   
   I'd leave it to you to think which is better 1) having both `iterator()` and 
`iterator(cfName: String)`, and leave `iterator()` to perform full scan 
regardless of multiple column families, 2) only having `iterator(cfName: 
String)`, and whether to full scan or just delegate to `prefixScan(cfName)` 
based on internal flag (multiple column families).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to