anishshri-db commented on code in PR #49304:
URL: https://github.com/apache/spark/pull/49304#discussion_r1959234822


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -847,35 +828,47 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder, Short)],
     colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
     fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  private def getColFamilyIdBytes: Option[Array[Byte]] = {
-    if (colFamilyNameOpt.isDefined) {
-      val colFamilyName = colFamilyNameOpt.get
-      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
-        throw new IllegalStateException(
-          s"Column family $colFamilyName not found in the key value encoder 
map")
-      }
-      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
-    } else {
-      None
-    }
+  /**
+   * Encodes a virtual column family ID into a byte array suitable for RocksDB.
+   *
+   * This method creates a fixed-size byte array prefixed with the virtual 
column family ID,
+   * which is used to partition data within RocksDB.
+   *
+   * @param virtualColFamilyId The column family identifier to encode
+   * @return A byte array containing the encoded column family ID
+   */
+  private def getColumnFamilyIdBytes(virtualColFamilyId: Short): Array[Byte] = 
{

Review Comment:
   I dont think I can consolidate without some additional refactoring. Will 
prefer to do this in a separate PR



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -847,35 +828,47 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder, Short)],
     colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
     fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  private def getColFamilyIdBytes: Option[Array[Byte]] = {
-    if (colFamilyNameOpt.isDefined) {
-      val colFamilyName = colFamilyNameOpt.get
-      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
-        throw new IllegalStateException(
-          s"Column family $colFamilyName not found in the key value encoder 
map")
-      }
-      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
-    } else {
-      None
-    }
+  /**
+   * Encodes a virtual column family ID into a byte array suitable for RocksDB.
+   *
+   * This method creates a fixed-size byte array prefixed with the virtual 
column family ID,
+   * which is used to partition data within RocksDB.
+   *
+   * @param virtualColFamilyId The column family identifier to encode
+   * @return A byte array containing the encoded column family ID
+   */
+  private def getColumnFamilyIdBytes(virtualColFamilyId: Short): Array[Byte] = 
{
+    val encodedBytes = new 
Array[Byte](StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+    Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET, 
virtualColFamilyId)
+    encodedBytes
   }
 
-  private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+  private def getExtractedKey(data: Array[Byte]): Array[Byte] = {

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to