ericm-db commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1904660082


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -67,39 +70,125 @@ object StateStoreColumnFamilySchemaUtils {
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       valEncoder: Encoder[T],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
-   StateStoreColFamilySchema(
+      hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = {
+    val schemas = mutable.Map[String, StateStoreColFamilySchema]()
+
+    // Add main value state schema
+    schemas.put(stateName, StateStoreColFamilySchema(
       stateName,
+      keySchemaId = 0,
       keyEncoder.schema,
+      valueSchemaId = 0,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
-      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))))
+
+    // Add TTL index if needed
+    if (hasTtl) {
+      val ttlIndexSchema = StateStoreColFamilySchema(
+        getTtlColFamilyName(stateName),
+        keySchemaId = 0,
+        getTTLRowKeySchema(keyEncoder.schema),
+        valueSchemaId = 0,
+        StructType(Array(StructField("__empty__", NullType))),
+        
Some(RangeKeyScanStateEncoderSpec(getTTLRowKeySchema(keyEncoder.schema), 
Seq(0))))
+      schemas.put(ttlIndexSchema.colFamilyName, ttlIndexSchema)
+    }
+
+    schemas.toMap
   }
 
   def getListStateSchema[T](
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       valEncoder: Encoder[T],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
-  StateStoreColFamilySchema(
+      hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = {
+    val schemas = mutable.Map[String, StateStoreColFamilySchema]()
+
+    // Add main list state schema
+    schemas.put(stateName, StateStoreColFamilySchema(
       stateName,
+      keySchemaId = 0,
       keyEncoder.schema,
+      valueSchemaId = 0,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
+      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))))
+    // Add row counter schema
+    val counterSchema = StateStoreColFamilySchema(
+      getRowCounterCFName(stateName), keySchemaId = 0,
+      keyEncoder.schema,
+      valueSchemaId = 0,
+      StructType(Seq(StructField("count", LongType, nullable = false))),
       Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+    schemas.put(counterSchema.colFamilyName, counterSchema)
+
+    // Add TTL-related schemas if needed
+    if (hasTtl) {
+      // TTL index
+      val ttlIndexSchema = StateStoreColFamilySchema(
+        getTtlColFamilyName(stateName),
+        keySchemaId = 0,
+        getTTLRowKeySchema(keyEncoder.schema),
+        valueSchemaId = 0,
+        StructType(Array(StructField("__empty__", NullType))),
+        
Some(RangeKeyScanStateEncoderSpec(getTTLRowKeySchema(keyEncoder.schema), 
Seq(0))))
+      schemas.put(ttlIndexSchema.colFamilyName, ttlIndexSchema)
+
+      // Min expiry index
+      val minIndexSchema = StateStoreColFamilySchema(
+        s"$$min_$stateName",
+        keySchemaId = 0,
+        keyEncoder.schema,
+        valueSchemaId = 0,
+        getExpirationMsRowSchema(),
+        Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+      schemas.put(minIndexSchema.colFamilyName, minIndexSchema)
+
+      // Count index
+      val countSchema = StateStoreColFamilySchema(
+        s"$$count_$stateName",
+        keySchemaId = 0,
+        keyEncoder.schema,
+        valueSchemaId = 0,
+        StructType(Seq(StructField("count", LongType, nullable = false))),
+        Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+      schemas.put(countSchema.colFamilyName, countSchema)
+    }
+
+    schemas.toMap
   }
 
   def getMapStateSchema[K, V](
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       userKeyEnc: Encoder[K],
       valEncoder: Encoder[V],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
+      hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = {
+    val schemas = mutable.Map[String, StateStoreColFamilySchema]()
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
-    StateStoreColFamilySchema(
+
+    // Add main map state schema
+    schemas.put(stateName, StateStoreColFamilySchema(
       stateName,
+      keySchemaId = 0,
       compositeKeySchema,
+      valueSchemaId = 0,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema)))
+
+    // Add TTL index if needed
+    if (hasTtl) {
+      val ttlIndexSchema = StateStoreColFamilySchema(
+        getTtlColFamilyName(stateName),
+        keySchemaId = 0,
+        getTTLRowKeySchema(compositeKeySchema),
+        valueSchemaId = 0,
+        StructType(Array(StructField("__empty__", NullType))),

Review Comment:
   This is just the equivalent of the DUMMY_VALUE_ROW that we use for TTL where 
we only care about the key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to