ericm-db commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1904660082
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -67,39 +70,125 @@ object StateStoreColumnFamilySchemaUtils { stateName: String, keyEncoder: ExpressionEncoder[Any], valEncoder: Encoder[T], - hasTtl: Boolean): StateStoreColFamilySchema = { - StateStoreColFamilySchema( + hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = { + val schemas = mutable.Map[String, StateStoreColFamilySchema]() + + // Add main value state schema + schemas.put(stateName, StateStoreColFamilySchema( stateName, + keySchemaId = 0, keyEncoder.schema, + valueSchemaId = 0, getValueSchemaWithTTL(valEncoder.schema, hasTtl), - Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) + Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))) + + // Add TTL index if needed + if (hasTtl) { + val ttlIndexSchema = StateStoreColFamilySchema( + getTtlColFamilyName(stateName), + keySchemaId = 0, + getTTLRowKeySchema(keyEncoder.schema), + valueSchemaId = 0, + StructType(Array(StructField("__empty__", NullType))), + Some(RangeKeyScanStateEncoderSpec(getTTLRowKeySchema(keyEncoder.schema), Seq(0)))) + schemas.put(ttlIndexSchema.colFamilyName, ttlIndexSchema) + } + + schemas.toMap } def getListStateSchema[T]( stateName: String, keyEncoder: ExpressionEncoder[Any], valEncoder: Encoder[T], - hasTtl: Boolean): StateStoreColFamilySchema = { - StateStoreColFamilySchema( + hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = { + val schemas = mutable.Map[String, StateStoreColFamilySchema]() + + // Add main list state schema + schemas.put(stateName, StateStoreColFamilySchema( stateName, + keySchemaId = 0, keyEncoder.schema, + valueSchemaId = 0, getValueSchemaWithTTL(valEncoder.schema, hasTtl), + Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))) + // Add row counter schema + val counterSchema = StateStoreColFamilySchema( + getRowCounterCFName(stateName), keySchemaId = 0, + keyEncoder.schema, + valueSchemaId = 0, + StructType(Seq(StructField("count", LongType, nullable = false))), Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) + schemas.put(counterSchema.colFamilyName, counterSchema) + + // Add TTL-related schemas if needed + if (hasTtl) { + // TTL index + val ttlIndexSchema = StateStoreColFamilySchema( + getTtlColFamilyName(stateName), + keySchemaId = 0, + getTTLRowKeySchema(keyEncoder.schema), + valueSchemaId = 0, + StructType(Array(StructField("__empty__", NullType))), + Some(RangeKeyScanStateEncoderSpec(getTTLRowKeySchema(keyEncoder.schema), Seq(0)))) + schemas.put(ttlIndexSchema.colFamilyName, ttlIndexSchema) + + // Min expiry index + val minIndexSchema = StateStoreColFamilySchema( + s"$$min_$stateName", + keySchemaId = 0, + keyEncoder.schema, + valueSchemaId = 0, + getExpirationMsRowSchema(), + Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) + schemas.put(minIndexSchema.colFamilyName, minIndexSchema) + + // Count index + val countSchema = StateStoreColFamilySchema( + s"$$count_$stateName", + keySchemaId = 0, + keyEncoder.schema, + valueSchemaId = 0, + StructType(Seq(StructField("count", LongType, nullable = false))), + Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) + schemas.put(countSchema.colFamilyName, countSchema) + } + + schemas.toMap } def getMapStateSchema[K, V]( stateName: String, keyEncoder: ExpressionEncoder[Any], userKeyEnc: Encoder[K], valEncoder: Encoder[V], - hasTtl: Boolean): StateStoreColFamilySchema = { + hasTtl: Boolean): Map[String, StateStoreColFamilySchema] = { + val schemas = mutable.Map[String, StateStoreColFamilySchema]() val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) - StateStoreColFamilySchema( + + // Add main map state schema + schemas.put(stateName, StateStoreColFamilySchema( stateName, + keySchemaId = 0, compositeKeySchema, + valueSchemaId = 0, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema))) + + // Add TTL index if needed + if (hasTtl) { + val ttlIndexSchema = StateStoreColFamilySchema( + getTtlColFamilyName(stateName), + keySchemaId = 0, + getTTLRowKeySchema(compositeKeySchema), + valueSchemaId = 0, + StructType(Array(StructField("__empty__", NullType))), Review Comment: This is just the equivalent of the DUMMY_VALUE_ROW that we use for TTL where we only care about the key -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org