anishshri-db commented on code in PR #49304:
URL: https://github.com/apache/spark/pull/49304#discussion_r1959235264


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -178,55 +183,77 @@ class RocksDB(
   // This is accessed and updated only between load and commit
   // which means it is implicitly guarded by acquireLock
   @GuardedBy("acquireLock")
-  private val colFamilyNameToIdMap = new ConcurrentHashMap[String, Short]()
+  private val colFamilyNameToInfoMap = new ConcurrentHashMap[String, 
ColumnFamilyInfo]()
+
+  @GuardedBy("acquireLock")
+  private val colFamilyIdToNameMap = new ConcurrentHashMap[Short, String]()
 
   @GuardedBy("acquireLock")
   private val maxColumnFamilyId: AtomicInteger = new AtomicInteger(-1)
 
   @GuardedBy("acquireLock")
   private val shouldForceSnapshot: AtomicBoolean = new AtomicBoolean(false)
 
-  /**
-   * Check whether the column family name is for internal column families.
-   *
-   * @param cfName - column family name
-   * @return - true if the column family is for internal use, false otherwise
-   */
-  private def checkInternalColumnFamilies(cfName: String): Boolean = 
cfName.charAt(0) == '_'
+  private def getColumnFamilyInfo(cfName: String): ColumnFamilyInfo = {
+    colFamilyNameToInfoMap.get(cfName)
+  }
+
+  private def getColumnFamilyNameForId(cfId: Short): String = {
+    colFamilyIdToNameMap.get(cfId)
+  }
 
-  // Methods to fetch column family mapping for this State Store version
-  def getColumnFamilyMapping: Map[String, Short] = {
-    colFamilyNameToIdMap.asScala
+  private def addToColFamilyMaps(cfName: String, cfId: Short, isInternal: 
Boolean): Unit = {
+    colFamilyNameToInfoMap.putIfAbsent(cfName, ColumnFamilyInfo(cfId, 
isInternal))
+    colFamilyIdToNameMap.putIfAbsent(cfId, cfName)
+  }
+
+  private def removeFromColFamilyMaps(cfName: String): Unit = {
+    val colFamilyInfo = colFamilyNameToInfoMap.get(cfName)
+    if (colFamilyInfo != null) {
+      colFamilyNameToInfoMap.remove(cfName)
+      colFamilyIdToNameMap.remove(colFamilyInfo.cfId)
+    }
   }
 
-  def getColumnFamilyId(cfName: String): Short = {
-    colFamilyNameToIdMap.get(cfName)
+  private def clearColFamilyMaps(): Unit = {
+    colFamilyNameToInfoMap.clear()
+    colFamilyIdToNameMap.clear()
   }
 
   /**
-   * Create RocksDB column family, if not created already
+   * Check if the column family exists with given name and create one if it 
doesn't. Users can
+   * create external column families storing user facing data as well as 
internal column families
+   * such as secondary indexes. Metrics for both of these types are tracked 
separately.
+   *
+   * @param colFamilyName - column family name
+   * @param isInternal - whether the column family is for internal use or not
+   * @return - virtual column family id
    */
-  def createColFamilyIfAbsent(colFamilyName: String): Short = {
+  def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean): 
Short = {
     if (!checkColFamilyExists(colFamilyName)) {
       val newColumnFamilyId = maxColumnFamilyId.incrementAndGet().toShort
-      colFamilyNameToIdMap.putIfAbsent(colFamilyName, newColumnFamilyId)
+      addToColFamilyMaps(colFamilyName, newColumnFamilyId, isInternal)
       shouldForceSnapshot.set(true)
       newColumnFamilyId
     } else {
-      colFamilyNameToIdMap.get(colFamilyName)
+      colFamilyNameToInfoMap.get(colFamilyName).cfId
     }
   }
 
   /**
    * Remove RocksDB column family, if exists
    * @return columnFamilyId if it exists, else None
    */
-  def removeColFamilyIfExists(colFamilyName: String): Option[Short] = {
+  def removeColFamilyIfExists(colFamilyName: String): Boolean = {
     if (checkColFamilyExists(colFamilyName)) {
       shouldForceSnapshot.set(true)
-      Some(colFamilyNameToIdMap.remove(colFamilyName))
+      prefixScan(Array.empty[Byte], colFamilyName).foreach { kv =>

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -467,29 +492,62 @@ class RocksDB(
     this
   }
 
+  /**
+   * Function to check if col family is internal or not based on information 
recorded in
+   * checkpoint metadata.
+   * @param cfName - column family name
+   * @param metadata - checkpoint metadata
+   * @return - type of column family (internal or otherwise)
+   */
+  private def checkColFamilyType(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to