brkyvz commented on code in PR #48401: URL: https://github.com/apache/spark/pull/48401#discussion_r1844340161
########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2204,6 +2204,16 @@ object SQLConf { .intConf .createWithDefault(3) + val STREAMING_STATE_STORE_ENCODING_FORMAT = + buildConf("spark.sql.streaming.stateStore.encodingFormat") + .doc("The encoding format used for stateful operators to store information " + + "in the state store") + .version("4.0.0") + .stringConf + .checkValue(v => Set("UnsafeRow", "Avro").contains(v), + "Valid values are 'UnsafeRow' and 'Avro'") Review Comment: nit: do we want to be case insensitive here? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -115,7 +119,7 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ - private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas closeProcessorHandle() columnFamilySchemas Review Comment: should this be moved to a static method? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -552,9 +570,9 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that // metadata/schema is written in the checkpoint directory of stateful operator. Review Comment: existing: oof, this is not great... ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -552,9 +570,9 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that // metadata/schema is written in the checkpoint directory of stateful operator. planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule - - simulateWatermarkPropagation(planWithStateOpId) - planWithStateOpId transform WatermarkPropagationRule.rule + val planWithStateSchemas = planWithStateOpId transform StateStoreColumnFamilySchemasRule.rule + simulateWatermarkPropagation(planWithStateSchemas) + planWithStateSchemas transform WatermarkPropagationRule.rule Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -259,6 +259,24 @@ class IncrementalExecution( } } + /** + * This rule populates the column family schemas for the TransformWithStateExec + * operator to ship them from the driver, where the schema and serializer objects + * are created, to the executor. + */ + object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule { + override val rule: PartialFunction[SparkPlan, SparkPlan] = { + case statefulOp: StatefulOperator => + statefulOp match { + case op: TransformWithStateExec => + op.copy( + columnFamilySchemas = op.getColFamilySchemas() + ) Review Comment: this is a bit confusing imho. If you have the `getColFamilySchemas` method as part of the class available, why do you have to set it on the class with a copy. Two possible suggestions: 1. Make the `getColFamilySchemas` a static method. Not sure if that's possible though looking at the logic a bit more in TransformWithStateExec. It feels weird that you're opening and closing these handles just to get some of the information out. 2. Add a comment here that this needs to be run on the Driver, and also instead rename the method to: `withColumnFamilySchemas` which calls copy internally. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -71,6 +224,33 @@ object StateStoreColumnFamilySchemaUtils { stateName, keySchema, valSchema, - Some(PrefixKeyScanStateEncoderSpec(keySchema, 1))) + Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)), + avroEnc = getAvroSerde( + StructType(keySchema.take(1)), + valSchema, + Some(StructType(keySchema.drop(1))) + )) + } + + // This function creates the StateStoreColFamilySchema for + // Timers' secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan + def getTimerStateSchemaForSecIndex( + stateName: String, + keySchema: StructType, + valSchema: StructType): StateStoreColFamilySchema = { + val avroKeySchema = StateStoreColumnFamilySchemaUtils. + convertForRangeScan(keySchema, Seq(0)) + StateStoreColFamilySchema( + stateName, + keySchema, + valSchema, + Some(RangeKeyScanStateEncoderSpec(keySchema, Seq(0))), + avroEnc = getAvroSerde( + StructType(avroKeySchema.drop(2)), Review Comment: ditto on comment ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils { valEncoder: Encoder[V], hasTtl: Boolean): StateStoreColFamilySchema = { val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) + val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl) StateStoreColFamilySchema( stateName, compositeKeySchema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema), + avroEnc = getAvroSerde( + StructType(compositeKeySchema.take(1)), + valSchema, + Some(StructType(compositeKeySchema.drop(1))) + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan + def getTtlStateSchema( + stateName: String, + keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = { + val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan( + getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0)) + val ttlValSchema = StructType( + Array(StructField("__dummy__", NullType))) + StateStoreColFamilySchema( + stateName, + ttlKeySchema, + ttlValSchema, + Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))), + avroEnc = getAvroSerde( + StructType(ttlKeySchema.drop(2)), + ttlValSchema + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan Review Comment: ditto on docs ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -552,9 +570,9 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that // metadata/schema is written in the checkpoint directory of stateful operator. planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule - - simulateWatermarkPropagation(planWithStateOpId) - planWithStateOpId transform WatermarkPropagationRule.rule + val planWithStateSchemas = planWithStateOpId transform StateStoreColumnFamilySchemasRule.rule Review Comment: nit: please avoid using scala specific magic, write this out as `planWithStateOpId.transform(StateStoreColumnFamilySchemasRule.rule)` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils { valEncoder: Encoder[V], hasTtl: Boolean): StateStoreColFamilySchema = { val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) + val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl) StateStoreColFamilySchema( stateName, compositeKeySchema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema), + avroEnc = getAvroSerde( + StructType(compositeKeySchema.take(1)), + valSchema, + Some(StructType(compositeKeySchema.drop(1))) + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan Review Comment: nit: can you make this a proper scaladoc please: ```scala /** * ... */ ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils { valEncoder: Encoder[V], hasTtl: Boolean): StateStoreColFamilySchema = { val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) + val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl) StateStoreColFamilySchema( stateName, compositeKeySchema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema), + avroEnc = getAvroSerde( + StructType(compositeKeySchema.take(1)), + valSchema, + Some(StructType(compositeKeySchema.drop(1))) + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan Review Comment: also mention that the range scan being an optimization in RocksDB ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils { valEncoder: Encoder[V], hasTtl: Boolean): StateStoreColFamilySchema = { val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) + val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl) StateStoreColFamilySchema( stateName, compositeKeySchema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema), + avroEnc = getAvroSerde( + StructType(compositeKeySchema.take(1)), + valSchema, + Some(StructType(compositeKeySchema.drop(1))) + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan + def getTtlStateSchema( + stateName: String, + keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = { + val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan( + getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0)) + val ttlValSchema = StructType( + Array(StructField("__dummy__", NullType))) + StateStoreColFamilySchema( + stateName, + ttlKeySchema, + ttlValSchema, + Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))), + avroEnc = getAvroSerde( + StructType(ttlKeySchema.drop(2)), + ttlValSchema + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan + def getTtlStateSchema( + stateName: String, + keyEncoder: ExpressionEncoder[Any], + userKeySchema: StructType): StateStoreColFamilySchema = { + val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan( + getCompositeKeyTTLRowSchema(keyEncoder.schema, userKeySchema), Seq(0)) + val ttlValSchema = StructType( + Array(StructField("__dummy__", NullType))) + StateStoreColFamilySchema( + stateName, + ttlKeySchema, + ttlValSchema, + Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))), + avroEnc = getAvroSerde( + StructType(ttlKeySchema.drop(2)), Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -552,9 +570,9 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that // metadata/schema is written in the checkpoint directory of stateful operator. planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule Review Comment: existing: here too ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -16,36 +16,133 @@ */ package org.apache.spark.sql.execution.streaming +import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions, AvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._ -import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.execution.streaming.state.{AvroEncoder, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RangeKeyScanStateEncoderSpec, StateStoreColFamilySchema} +import org.apache.spark.sql.types._ -object StateStoreColumnFamilySchemaUtils { +object StateStoreColumnFamilySchemaUtils extends Serializable { + + def apply(initializeAvroSerde: Boolean): StateStoreColumnFamilySchemaUtils = + new StateStoreColumnFamilySchemaUtils(initializeAvroSerde) + + /** + * Avro uses zig-zag encoding for some fixed-length types, like Longs and Ints. For range scans + * we want to use big-endian encoding, so we need to convert the source schema to replace these + * types with BinaryType. + * + * @param schema The schema to convert + * @param ordinals If non-empty, only convert fields at these ordinals. + * If empty, convert all fields. + */ + def convertForRangeScan(schema: StructType, ordinals: Seq[Int] = Seq.empty): StructType = { + val ordinalSet = ordinals.toSet + + StructType(schema.fields.zipWithIndex.flatMap { case (field, idx) => + if ((ordinals.isEmpty || ordinalSet.contains(idx)) && isFixedSize(field.dataType)) { + // For each numeric field, create two fields: + // 1. Byte marker for null, positive, or negative values + // 2. The original numeric value in big-endian format + // Byte type is converted to Int in Avro, which doesn't work for us as Avro + // uses zig-zag encoding as opposed to big-endian for Ints + Seq( + StructField(s"${field.name}_marker", BinaryType, nullable = false), + field.copy(name = s"${field.name}_value", BinaryType) + ) + } else { + Seq(field) + } + }) + } + + private def isFixedSize(dataType: DataType): Boolean = dataType match { + case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType | + _: FloatType | _: DoubleType => true + case _ => false + } + + def getTtlColFamilyName(stateName: String): String = { + "$ttl_" + stateName + } +} + +/** + * + * @param initializeAvroSerde Whether or not to create the Avro serializers and deserializers + * for this state type. This class is used to create the + * StateStoreColumnFamilySchema for each state variable from the driver + */ +class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean) + extends Logging with Serializable { + private def getAvroSerializer(schema: StructType): AvroSerializer = { + val avroType = SchemaConverters.toAvroType(schema) + new AvroSerializer(schema, avroType, nullable = false) + } + + private def getAvroDeserializer(schema: StructType): AvroDeserializer = { + val avroType = SchemaConverters.toAvroType(schema) + val avroOptions = AvroOptions(Map.empty) + new AvroDeserializer(avroType, schema, + avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType, + avroOptions.stableIdPrefixForUnionType, avroOptions.recursiveFieldMaxDepth) + } + + /** + * If initializeAvroSerde is true, this method will create an Avro Serializer and Deserializer + * for a particular key and value schema. + */ + private[sql] def getAvroSerde( + keySchema: StructType, + valSchema: StructType, + suffixKeySchema: Option[StructType] = None + ): Option[AvroEncoder] = { + if (initializeAvroSerde) { + val (suffixKeySer, suffixKeyDe) = if (suffixKeySchema.isDefined) { + (Some(getAvroSerializer(suffixKeySchema.get)), + Some(getAvroDeserializer(suffixKeySchema.get))) + } else { + (None, None) + } + Some(AvroEncoder( + getAvroSerializer(keySchema), + getAvroDeserializer(keySchema), + getAvroSerializer(valSchema), + getAvroDeserializer(valSchema), + suffixKeySer, suffixKeyDe)) Review Comment: how about: `suffixKeySchema.map(getAvroSerializer)`, `suffixKeySchema.map(getAvroDeserializer)` here instead ########## connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala: ########## @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.spark.SparkException +import org.apache.spark.sql.avro.SchemaConverters Review Comment: is this a stray change? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -71,6 +224,33 @@ object StateStoreColumnFamilySchemaUtils { stateName, keySchema, valSchema, - Some(PrefixKeyScanStateEncoderSpec(keySchema, 1))) + Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)), + avroEnc = getAvroSerde( + StructType(keySchema.take(1)), + valSchema, + Some(StructType(keySchema.drop(1))) + )) + } + + // This function creates the StateStoreColFamilySchema for Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils { valEncoder: Encoder[V], hasTtl: Boolean): StateStoreColFamilySchema = { val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) + val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl) StateStoreColFamilySchema( stateName, compositeKeySchema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema), + avroEnc = getAvroSerde( + StructType(compositeKeySchema.take(1)), + valSchema, + Some(StructType(compositeKeySchema.drop(1))) + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan + def getTtlStateSchema( + stateName: String, + keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = { + val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan( + getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0)) + val ttlValSchema = StructType( + Array(StructField("__dummy__", NullType))) + StateStoreColFamilySchema( + stateName, + ttlKeySchema, + ttlValSchema, + Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))), + avroEnc = getAvroSerde( + StructType(ttlKeySchema.drop(2)), Review Comment: this drop(2) looks magical. Can you add a comment mentioning that these represent the null/positive/negative byte and the big endian representation? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -16,36 +16,133 @@ */ package org.apache.spark.sql.execution.streaming +import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions, AvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._ -import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.execution.streaming.state.{AvroEncoder, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RangeKeyScanStateEncoderSpec, StateStoreColFamilySchema} +import org.apache.spark.sql.types._ -object StateStoreColumnFamilySchemaUtils { +object StateStoreColumnFamilySchemaUtils extends Serializable { + + def apply(initializeAvroSerde: Boolean): StateStoreColumnFamilySchemaUtils = + new StateStoreColumnFamilySchemaUtils(initializeAvroSerde) + + /** + * Avro uses zig-zag encoding for some fixed-length types, like Longs and Ints. For range scans + * we want to use big-endian encoding, so we need to convert the source schema to replace these + * types with BinaryType. + * + * @param schema The schema to convert + * @param ordinals If non-empty, only convert fields at these ordinals. + * If empty, convert all fields. + */ + def convertForRangeScan(schema: StructType, ordinals: Seq[Int] = Seq.empty): StructType = { + val ordinalSet = ordinals.toSet + + StructType(schema.fields.zipWithIndex.flatMap { case (field, idx) => + if ((ordinals.isEmpty || ordinalSet.contains(idx)) && isFixedSize(field.dataType)) { + // For each numeric field, create two fields: + // 1. Byte marker for null, positive, or negative values + // 2. The original numeric value in big-endian format + // Byte type is converted to Int in Avro, which doesn't work for us as Avro + // uses zig-zag encoding as opposed to big-endian for Ints + Seq( + StructField(s"${field.name}_marker", BinaryType, nullable = false), + field.copy(name = s"${field.name}_value", BinaryType) + ) + } else { + Seq(field) + } + }) + } + + private def isFixedSize(dataType: DataType): Boolean = dataType match { + case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType | + _: FloatType | _: DoubleType => true + case _ => false + } + + def getTtlColFamilyName(stateName: String): String = { + "$ttl_" + stateName + } +} + +/** + * + * @param initializeAvroSerde Whether or not to create the Avro serializers and deserializers + * for this state type. This class is used to create the + * StateStoreColumnFamilySchema for each state variable from the driver + */ +class StateStoreColumnFamilySchemaUtils(initializeAvroSerde: Boolean) + extends Logging with Serializable { + private def getAvroSerializer(schema: StructType): AvroSerializer = { + val avroType = SchemaConverters.toAvroType(schema) + new AvroSerializer(schema, avroType, nullable = false) + } + + private def getAvroDeserializer(schema: StructType): AvroDeserializer = { + val avroType = SchemaConverters.toAvroType(schema) + val avroOptions = AvroOptions(Map.empty) + new AvroDeserializer(avroType, schema, + avroOptions.datetimeRebaseModeInRead, avroOptions.useStableIdForUnionType, + avroOptions.stableIdPrefixForUnionType, avroOptions.recursiveFieldMaxDepth) + } + + /** + * If initializeAvroSerde is true, this method will create an Avro Serializer and Deserializer + * for a particular key and value schema. + */ + private[sql] def getAvroSerde( + keySchema: StructType, + valSchema: StructType, + suffixKeySchema: Option[StructType] = None Review Comment: can you add a `@param` explanation for this one please? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -259,6 +259,24 @@ class IncrementalExecution( } } + /** + * This rule populates the column family schemas for the TransformWithStateExec + * operator to ship them from the driver, where the schema and serializer objects + * are created, to the executor. + */ + object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule { + override val rule: PartialFunction[SparkPlan, SparkPlan] = { + case statefulOp: StatefulOperator => + statefulOp match { + case op: TransformWithStateExec => Review Comment: nit: I assume you're doing this two step matching, because the avro serde will be added to other operators too in follow ups? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -55,12 +152,68 @@ object StateStoreColumnFamilySchemaUtils { valEncoder: Encoder[V], hasTtl: Boolean): StateStoreColFamilySchema = { val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) + val valSchema = getValueSchemaWithTTL(valEncoder.schema, hasTtl) StateStoreColFamilySchema( stateName, compositeKeySchema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema), + avroEnc = getAvroSerde( + StructType(compositeKeySchema.take(1)), + valSchema, + Some(StructType(compositeKeySchema.drop(1))) + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan + def getTtlStateSchema( + stateName: String, + keyEncoder: ExpressionEncoder[Any]): StateStoreColFamilySchema = { + val ttlKeySchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan( + getSingleKeyTTLRowSchema(keyEncoder.schema), Seq(0)) + val ttlValSchema = StructType( + Array(StructField("__dummy__", NullType))) + StateStoreColFamilySchema( + stateName, + ttlKeySchema, + ttlValSchema, + Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))), + avroEnc = getAvroSerde( + StructType(ttlKeySchema.drop(2)), + ttlValSchema + ) + ) + } + + // This function creates the StateStoreColFamilySchema for + // the TTL secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan Review Comment: Also please specify when this method should be used and not the one above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org