brkyvz commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1844340161


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2204,6 +2204,16 @@ object SQLConf {
       .intConf
       .createWithDefault(3)
 
+  val STREAMING_STATE_STORE_ENCODING_FORMAT =
+    buildConf("spark.sql.streaming.stateStore.encodingFormat")
+      .doc("The encoding format used for stateful operators to store 
information " +
+        "in the state store")
+      .version("4.0.0")
+      .stringConf
+      .checkValue(v => Set("UnsafeRow", "Avro").contains(v),
+        "Valid values are 'UnsafeRow' and 'Avro'")

Review Comment:
   nit: do we want to be case insensitive here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -115,7 +119,7 @@ case class TransformWithStateExec(
    * Fetching the columnFamilySchemas from the StatefulProcessorHandle
    * after init is called.
    */
-  private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
+  def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
     val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
     closeProcessorHandle()
     columnFamilySchemas

Review Comment:
   should this be moved to a static method?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -552,9 +570,9 @@ class IncrementalExecution(
       // The rule below doesn't change the plan but can cause the side effect 
that
       // metadata/schema is written in the checkpoint directory of stateful 
operator.

Review Comment:
   existing: oof, this is not great...



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -552,9 +570,9 @@ class IncrementalExecution(
       // The rule below doesn't change the plan but can cause the side effect 
that
       // metadata/schema is written in the checkpoint directory of stateful 
operator.
       planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule
-
-      simulateWatermarkPropagation(planWithStateOpId)
-      planWithStateOpId transform WatermarkPropagationRule.rule
+      val planWithStateSchemas = planWithStateOpId transform 
StateStoreColumnFamilySchemasRule.rule
+      simulateWatermarkPropagation(planWithStateSchemas)
+      planWithStateSchemas transform WatermarkPropagationRule.rule

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -259,6 +259,24 @@ class IncrementalExecution(
     }
   }
 
+  /**
+   * This rule populates the column family schemas for the 
TransformWithStateExec
+   * operator to ship them from the driver, where the schema and serializer 
objects
+   * are created, to the executor.
+   */
+  object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule {
+    override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      case statefulOp: StatefulOperator =>
+        statefulOp match {
+          case op: TransformWithStateExec =>
+            op.copy(
+              columnFamilySchemas = op.getColFamilySchemas()
+            )

Review Comment:
   this is a bit confusing imho. If you have the `getColFamilySchemas` method 
as part of the class available, why do you have to set it on the class with a 
copy.
   
   Two possible suggestions:
   1. Make the `getColFamilySchemas` a static method. Not sure if that's 
possible though looking at the logic a bit more in TransformWithStateExec. It 
feels weird that you're opening and closing these handles just to get some of 
the information out.
   2. Add a comment here that this needs to be run on the Driver, and also 
instead rename the method to: `withColumnFamilySchemas` which calls copy 
internally.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -71,6 +224,33 @@ object StateStoreColumnFamilySchemaUtils {
       stateName,
       keySchema,
       valSchema,
-      Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)))
+      Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)),
+      avroEnc = getAvroSerde(
+        StructType(keySchema.take(1)),
+        valSchema,
+        Some(StructType(keySchema.drop(1)))
+      ))
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // Timers' secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan
+  def getTimerStateSchemaForSecIndex(
+      stateName: String,
+      keySchema: StructType,
+      valSchema: StructType): StateStoreColFamilySchema = {
+    val avroKeySchema = StateStoreColumnFamilySchemaUtils.
+      convertForRangeScan(keySchema, Seq(0))
+    StateStoreColFamilySchema(
+      stateName,
+      keySchema,
+      valSchema,
+      Some(RangeKeyScanStateEncoderSpec(keySchema, Seq(0))),
+      avroEnc = getAvroSerde(
+        StructType(avroKeySchema.drop(2)),

Review Comment:
   ditto on comment



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils {
       valEncoder: Encoder[V],
       hasTtl: Boolean): StateStoreColFamilySchema = {
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
+    val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl)
     StateStoreColFamilySchema(
       stateName,
       compositeKeySchema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema),
+      avroEnc = getAvroSerde(
+        StructType(compositeKeySchema.take(1)),
+        valSchema,
+        Some(StructType(compositeKeySchema.drop(1)))
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan
+  def getTtlStateSchema(
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = {
+    val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan(
+      getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0))
+    val ttlValSchema = StructType(
+      Array(StructField("__dummy__", NullType)))
+    StateStoreColFamilySchema(
+      stateName,
+      ttlKeySchema,
+      ttlValSchema,
+      Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))),
+      avroEnc = getAvroSerde(
+        StructType(ttlKeySchema.drop(2)),
+        ttlValSchema
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan

Review Comment:
   ditto on docs



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -552,9 +570,9 @@ class IncrementalExecution(
       // The rule below doesn't change the plan but can cause the side effect 
that
       // metadata/schema is written in the checkpoint directory of stateful 
operator.
       planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule
-
-      simulateWatermarkPropagation(planWithStateOpId)
-      planWithStateOpId transform WatermarkPropagationRule.rule
+      val planWithStateSchemas = planWithStateOpId transform 
StateStoreColumnFamilySchemasRule.rule

Review Comment:
   nit: please avoid using scala specific magic, write this out as 
`planWithStateOpId.transform(StateStoreColumnFamilySchemasRule.rule)`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils {
       valEncoder: Encoder[V],
       hasTtl: Boolean): StateStoreColFamilySchema = {
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
+    val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl)
     StateStoreColFamilySchema(
       stateName,
       compositeKeySchema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema),
+      avroEnc = getAvroSerde(
+        StructType(compositeKeySchema.take(1)),
+        valSchema,
+        Some(StructType(compositeKeySchema.drop(1)))
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan

Review Comment:
   nit: can you make this a proper scaladoc please: 
   ```scala
   /**
    * ...
    */
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils {
       valEncoder: Encoder[V],
       hasTtl: Boolean): StateStoreColFamilySchema = {
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
+    val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl)
     StateStoreColFamilySchema(
       stateName,
       compositeKeySchema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema),
+      avroEnc = getAvroSerde(
+        StructType(compositeKeySchema.take(1)),
+        valSchema,
+        Some(StructType(compositeKeySchema.drop(1)))
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan

Review Comment:
   also mention that the range scan being an optimization in RocksDB



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils {
       valEncoder: Encoder[V],
       hasTtl: Boolean): StateStoreColFamilySchema = {
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
+    val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl)
     StateStoreColFamilySchema(
       stateName,
       compositeKeySchema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema),
+      avroEnc = getAvroSerde(
+        StructType(compositeKeySchema.take(1)),
+        valSchema,
+        Some(StructType(compositeKeySchema.drop(1)))
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan
+  def getTtlStateSchema(
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = {
+    val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan(
+      getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0))
+    val ttlValSchema = StructType(
+      Array(StructField("__dummy__", NullType)))
+    StateStoreColFamilySchema(
+      stateName,
+      ttlKeySchema,
+      ttlValSchema,
+      Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))),
+      avroEnc = getAvroSerde(
+        StructType(ttlKeySchema.drop(2)),
+        ttlValSchema
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan
+  def getTtlStateSchema(
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any],
+      userKeySchema: StructType): StateStoreColFamilySchema = {
+    val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan(
+      getCompositeKeyTTLRowSchema(keyEncoder.schema, userKeySchema), Seq(0))
+    val ttlValSchema = StructType(
+      Array(StructField("__dummy__", NullType)))
+    StateStoreColFamilySchema(
+      stateName,
+      ttlKeySchema,
+      ttlValSchema,
+      Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))),
+      avroEnc = getAvroSerde(
+        StructType(ttlKeySchema.drop(2)),

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -552,9 +570,9 @@ class IncrementalExecution(
       // The rule below doesn't change the plan but can cause the side effect 
that
       // metadata/schema is written in the checkpoint directory of stateful 
operator.
       planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule

Review Comment:
   existing: here too



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -16,36 +16,133 @@
  */
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions, 
AvroSerializer, SchemaConverters}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
-import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.streaming.state.{AvroEncoder, 
NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, 
RangeKeyScanStateEncoderSpec, StateStoreColFamilySchema}
+import org.apache.spark.sql.types._
 
-object StateStoreColumnFamilySchemaUtils {
+object StateStoreColumnFamilySchemaUtils extends Serializable {
+
+  def apply(initializeAvroSerde: Boolean): StateStoreColumnFamilySchemaUtils =
+    new StateStoreColumnFamilySchemaUtils(initializeAvroSerde)
+
+  /**
+   * Avro uses zig-zag encoding for some fixed-length types, like Longs and 
Ints. For range scans
+   * we want to use big-endian encoding, so we need to convert the source 
schema to replace these
+   * types with BinaryType.
+   *
+   * @param schema The schema to convert
+   * @param ordinals If non-empty, only convert fields at these ordinals.
+   *                 If empty, convert all fields.
+   */
+  def convertForRangeScan(schema: StructType, ordinals: Seq[Int] = Seq.empty): 
StructType = {
+    val ordinalSet = ordinals.toSet
+
+    StructType(schema.fields.zipWithIndex.flatMap { case (field, idx) =>
+      if ((ordinals.isEmpty || ordinalSet.contains(idx)) && 
isFixedSize(field.dataType)) {
+        // For each numeric field, create two fields:
+        // 1. Byte marker for null, positive, or negative values
+        // 2. The original numeric value in big-endian format
+        // Byte type is converted to Int in Avro, which doesn't work for us as 
Avro
+        // uses zig-zag encoding as opposed to big-endian for Ints
+        Seq(
+          StructField(s"${field.name}_marker", BinaryType, nullable = false),
+          field.copy(name = s"${field.name}_value", BinaryType)
+        )
+      } else {
+        Seq(field)
+      }
+    })
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: 
LongType |
+         _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  def getTtlColFamilyName(stateName: String): String = {
+    "$ttl_" + stateName
+  }
+}
+
+/**
+ *
+ * @param initializeAvroSerde Whether or not to create the Avro serializers 
and deserializers
+ *                            for this state type. This class is used to 
create the
+ *                            StateStoreColumnFamilySchema for each state 
variable from the driver
+ */
+class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean)
+    extends Logging with Serializable {
+  private def getAvroSerializer(schema: StructType): AvroSerializer = {
+    val avroType = SchemaConverters.toAvroType(schema)
+    new AvroSerializer(schema, avroType, nullable = false)
+  }
+
+  private def getAvroDeserializer(schema: StructType): AvroDeserializer = {
+    val avroType = SchemaConverters.toAvroType(schema)
+    val avroOptions = AvroOptions(Map.empty)
+    new AvroDeserializer(avroType, schema,
+      avroOptions.datetimeRebaseModeInRead, 
avroOptions.useStableIdForUnionType,
+      avroOptions.stableIdPrefixForUnionType, 
avroOptions.recursiveFieldMaxDepth)
+  }
+
+  /**
+   * If initializeAvroSerde is true, this method will create an Avro 
Serializer and Deserializer
+   * for a particular key and value schema.
+   */
+  private[sql] def getAvroSerde(
+      keySchema: StructType,
+      valSchema: StructType,
+      suffixKeySchema: Option[StructType] = None
+  ): Option[AvroEncoder] = {
+    if (initializeAvroSerde) {
+      val (suffixKeySer, suffixKeyDe) = if (suffixKeySchema.isDefined) {
+        (Some(getAvroSerializer(suffixKeySchema.get)),
+          Some(getAvroDeserializer(suffixKeySchema.get)))
+      } else {
+        (None, None)
+      }
+      Some(AvroEncoder(
+        getAvroSerializer(keySchema),
+        getAvroDeserializer(keySchema),
+        getAvroSerializer(valSchema),
+        getAvroDeserializer(valSchema),
+        suffixKeySer, suffixKeyDe))

Review Comment:
   how about: `suffixKeySchema.map(getAvroSerializer)`, 
`suffixKeySchema.map(getAvroDeserializer)` here instead



##########
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala:
##########
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader
 import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.avro.SchemaConverters

Review Comment:
   is this a stray change?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -71,6 +224,33 @@ object StateStoreColumnFamilySchemaUtils {
       stateName,
       keySchema,
       valSchema,
-      Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)))
+      Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)),
+      avroEnc = getAvroSerde(
+        StructType(keySchema.take(1)),
+        valSchema,
+        Some(StructType(keySchema.drop(1)))
+      ))
+  }
+
+  // This function creates the StateStoreColFamilySchema for

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils {
       valEncoder: Encoder[V],
       hasTtl: Boolean): StateStoreColFamilySchema = {
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
+    val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl)
     StateStoreColFamilySchema(
       stateName,
       compositeKeySchema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema),
+      avroEnc = getAvroSerde(
+        StructType(compositeKeySchema.take(1)),
+        valSchema,
+        Some(StructType(compositeKeySchema.drop(1)))
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan
+  def getTtlStateSchema(
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = {
+    val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan(
+      getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0))
+    val ttlValSchema = StructType(
+      Array(StructField("__dummy__", NullType)))
+    StateStoreColFamilySchema(
+      stateName,
+      ttlKeySchema,
+      ttlValSchema,
+      Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))),
+      avroEnc = getAvroSerde(
+        StructType(ttlKeySchema.drop(2)),

Review Comment:
   this drop(2) looks magical. Can you add a comment mentioning that these 
represent the null/positive/negative byte and the big endian representation?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -16,36 +16,133 @@
  */
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions, 
AvroSerializer, SchemaConverters}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
-import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.streaming.state.{AvroEncoder, 
NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, 
RangeKeyScanStateEncoderSpec, StateStoreColFamilySchema}
+import org.apache.spark.sql.types._
 
-object StateStoreColumnFamilySchemaUtils {
+object StateStoreColumnFamilySchemaUtils extends Serializable {
+
+  def apply(initializeAvroSerde: Boolean): StateStoreColumnFamilySchemaUtils =
+    new StateStoreColumnFamilySchemaUtils(initializeAvroSerde)
+
+  /**
+   * Avro uses zig-zag encoding for some fixed-length types, like Longs and 
Ints. For range scans
+   * we want to use big-endian encoding, so we need to convert the source 
schema to replace these
+   * types with BinaryType.
+   *
+   * @param schema The schema to convert
+   * @param ordinals If non-empty, only convert fields at these ordinals.
+   *                 If empty, convert all fields.
+   */
+  def convertForRangeScan(schema: StructType, ordinals: Seq[Int] = Seq.empty): 
StructType = {
+    val ordinalSet = ordinals.toSet
+
+    StructType(schema.fields.zipWithIndex.flatMap { case (field, idx) =>
+      if ((ordinals.isEmpty || ordinalSet.contains(idx)) && 
isFixedSize(field.dataType)) {
+        // For each numeric field, create two fields:
+        // 1. Byte marker for null, positive, or negative values
+        // 2. The original numeric value in big-endian format
+        // Byte type is converted to Int in Avro, which doesn't work for us as 
Avro
+        // uses zig-zag encoding as opposed to big-endian for Ints
+        Seq(
+          StructField(s"${field.name}_marker", BinaryType, nullable = false),
+          field.copy(name = s"${field.name}_value", BinaryType)
+        )
+      } else {
+        Seq(field)
+      }
+    })
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: 
LongType |
+         _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  def getTtlColFamilyName(stateName: String): String = {
+    "$ttl_" + stateName
+  }
+}
+
+/**
+ *
+ * @param initializeAvroSerde Whether or not to create the Avro serializers 
and deserializers
+ *                            for this state type. This class is used to 
create the
+ *                            StateStoreColumnFamilySchema for each state 
variable from the driver
+ */
+class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean)
+    extends Logging with Serializable {
+  private def getAvroSerializer(schema: StructType): AvroSerializer = {
+    val avroType = SchemaConverters.toAvroType(schema)
+    new AvroSerializer(schema, avroType, nullable = false)
+  }
+
+  private def getAvroDeserializer(schema: StructType): AvroDeserializer = {
+    val avroType = SchemaConverters.toAvroType(schema)
+    val avroOptions = AvroOptions(Map.empty)
+    new AvroDeserializer(avroType, schema,
+      avroOptions.datetimeRebaseModeInRead, 
avroOptions.useStableIdForUnionType,
+      avroOptions.stableIdPrefixForUnionType, 
avroOptions.recursiveFieldMaxDepth)
+  }
+
+  /**
+   * If initializeAvroSerde is true, this method will create an Avro 
Serializer and Deserializer
+   * for a particular key and value schema.
+   */
+  private[sql] def getAvroSerde(
+      keySchema: StructType,
+      valSchema: StructType,
+      suffixKeySchema: Option[StructType] = None

Review Comment:
   can you add a `@param` explanation for this one please?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -259,6 +259,24 @@ class IncrementalExecution(
     }
   }
 
+  /**
+   * This rule populates the column family schemas for the 
TransformWithStateExec
+   * operator to ship them from the driver, where the schema and serializer 
objects
+   * are created, to the executor.
+   */
+  object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule {
+    override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      case statefulOp: StatefulOperator =>
+        statefulOp match {
+          case op: TransformWithStateExec =>

Review Comment:
   nit: I assume you're doing this two step matching, because the avro serde 
will be added to other operators too in follow ups?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils {
       valEncoder: Encoder[V],
       hasTtl: Boolean): StateStoreColFamilySchema = {
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
+    val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl)
     StateStoreColFamilySchema(
       stateName,
       compositeKeySchema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema),
+      avroEnc = getAvroSerde(
+        StructType(compositeKeySchema.take(1)),
+        valSchema,
+        Some(StructType(compositeKeySchema.drop(1)))
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan
+  def getTtlStateSchema(
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = {
+    val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan(
+      getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0))
+    val ttlValSchema = StructType(
+      Array(StructField("__dummy__", NullType)))
+    StateStoreColFamilySchema(
+      stateName,
+      ttlKeySchema,
+      ttlValSchema,
+      Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))),
+      avroEnc = getAvroSerde(
+        StructType(ttlKeySchema.drop(2)),
+        ttlValSchema
+      )
+    )
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // the TTL secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan

Review Comment:
   Also please specify when this method should be used and not the one above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to