HeartSaVioR commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1909826111


##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -372,6 +373,178 @@ object SchemaConverters extends Logging {
       schema
     }
   }
+
+  /**
+   * Creates default values for Spark SQL data types when converting to Avro.
+   * This ensures fields have appropriate defaults during schema evolution.
+   *
+   * This method recursively processes Spark SQL data types and generates 
corresponding
+   * default values that are compatible with Avro schema specifications. It 
handles
+   * both primitive types (like Boolean, Integer) and complex types (Arrays, 
Maps, Structs).
+   *
+   * @param dataType The Spark SQL DataType to create a default value for
+   * @return A default value appropriate for the given data type that's 
compatible with Avro
+   */
+  private def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      // Basic types
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f
+      case DoubleType => 0.0
+      case StringType => ""
+      case BinaryType => java.nio.ByteBuffer.allocate(0)
+
+      // Complex types
+      case ArrayType(elementType, _) =>
+        new java.util.ArrayList[Any]()
+      case MapType(StringType, valueType, _) =>
+        new java.util.HashMap[String, Any]()
+      case st: StructType => createNestedDefault(st)
+
+      // Special types
+      case _: DecimalType => java.nio.ByteBuffer.allocate(0)
+      case DateType => 0
+      case TimestampType => 0L
+      case TimestampNTZType => 0L
+      case NullType => null
+      case _ => null
+    }
+  }
+
+  /**
+   * Converts a Spark SQL schema to a corresponding Avro schema.
+   * This method provides comprehensive support for schema evolution and 
handles
+   * complex nested types while maintaining type safety and compatibility.
+   *
+   * The conversion process includes:
+   * - Converting primitive Spark SQL types to Avro types
+   * - Handling complex types (arrays, maps, structs) with proper nesting
+   * - Supporting nullable fields through Avro unions
+   * - Managing logical types for dates, timestamps, and decimals
+   * - Generating unique names for nested records
+   * - Preserving namespace hierarchy for nested structures
+   *
+   * @param catalystType The Spark SQL DataType to convert
+   * @param nullable Whether the field can contain null values
+   * @param recordName The name to use for the record in the Avro schema
+   * @param namespace The namespace for the Avro schema
+   * @param nestingLevel Current nesting level for generating unique names
+   * @return An Avro Schema corresponding to the input Spark SQL type
+   * @throws IncompatibleSchemaException if the input type cannot be converted 
to Avro
+   */
+  def toAvroTypeWithDefaults(
+      catalystType: DataType,
+      nullable: Boolean = false,

Review Comment:
   Is there a case where the caller (outer, not recursive call) will call this 
method with non-struct type? 
   
   If there is no case, it'd be more clear to only allow StructType in public 
method and make this method be private or nested method in the new public 
method, because DataType itself has no concept of being nullable and caller 
could be confusing about this param.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2323,6 +2323,13 @@ object SQLConf {
       .intConf
       .createWithDefault(1)
 
+  val STREAMING_STATE_SCHEMA_FILES_THRESHOLD =

Review Comment:
   nit: let's think more on whether we want to use more direct config name e.g. 
`maxNumFiles`. I see both patterns exist in the config so threshold may be 
acceptable. I just feel like making the config name more direct would be easier 
to understand.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -960,8 +991,8 @@ case class SessionWindowStateStoreSaveExec(
   override def validateAndMaybeEvolveStateSchema(
       hadoopConf: Configuration, batchId: Long, stateSchemaVersion: Int):
     List[StateSchemaValidationResult] = {
-    val newStateSchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
-      stateManager.getStateKeySchema, stateManager.getStateValueSchema))
+    val newStateSchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0,

Review Comment:
   nit: shall we make this be consistent? Either remove named params in above 
or put named params here and below.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -263,8 +263,8 @@ case class StreamingSymmetricHashJoinExec(
 
     // validate and maybe evolve schema for all state stores across both sides 
of the join
     result.map { case (stateStoreName, (keySchema, valueSchema)) =>
-      val newStateSchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
-        keySchema, valueSchema))
+      val newStateSchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0,

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -82,11 +82,26 @@ private[sql] class RocksDBStateStoreProvider
         stateStoreName = stateStoreId.storeName,
         colFamilyName = colFamilyName)
 
-      val dataEncoder = getDataEncoder(
-        stateStoreEncoding, dataEncoderCacheKey, keyStateEncoderSpec, 
valueSchema)
-
       val columnFamilyInfo = Some(ColumnFamilyInfo(colFamilyName, 
newColFamilyId))
 
+      // For test cases only: TestStateSchemaProvider allows dynamically 
adding schemas
+      // during test execution to verify schema evolution behavior. In 
production,
+      // schemas are loaded from checkpoint data
+      stateSchemaProvider match {
+        case Some(t: TestStateSchemaProvider) =>
+          t.addSchema(colFamilyName, keySchema, valueSchema)

Review Comment:
   somehow left the comment below, ditto here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  def addSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey)
+      .map(_.schemaId).max
+  }
+}
+
+class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends 
StateSchemaProvider {
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    metadata.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    metadata.activeSchemas
+      .keys
+      .filter(key =>

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala:
##########
@@ -26,19 +26,25 @@ import org.apache.spark.sql.connector.read.ScanBuilder
 import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
 import org.apache.spark.sql.execution.streaming.TransformWithStateVariableInfo
-import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, 
StateStoreColFamilySchema, StateStoreConf}
+import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, 
StateSchemaProvider, StateStoreColFamilySchema, StateStoreConf}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-/** An implementation of [[Table]] with [[SupportsRead]] for State Store data 
source. */
+/**
+ * An implementation of [[Table]] with [[SupportsRead]] for State Store data 
source.
+ * @param stateSchemaProviderOpt Optional provider that maintains mapping 
between schema IDs and

Review Comment:
   One question: what would be the schema of the state when we do time-travel? 
And what would be the schema of the state when we read from changelog in 
version range?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2334,6 +2341,13 @@ object SQLConf {
         "Valid values are 'unsaferow' and 'avro'")
       .createWithDefault("unsaferow")
 
+  val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD =

Review Comment:
   Btw, any reason we have to restrict this?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -233,6 +234,33 @@ trait StateStoreWriter
     }
   }
 
+  def stateSchemaList(
+      stateSchemaValidationResults: List[StateSchemaValidationResult],
+      oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = {
+
+    def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] 
= {
+      val ssInfo = metadata.stateStoreInfo.head
+      ssInfo.stateSchemaFilePaths
+    }
+
+    val validationResult = stateSchemaValidationResults.head

Review Comment:
   I forgot about the detail, but it's a bit odd to just pick the head. It 
sounds to me the param does not need to be a list. Do we have other case where 
the validation results are multiple, and if then is it safe for us to pick the 
head here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -233,6 +234,33 @@ trait StateStoreWriter
     }
   }
 
+  def stateSchemaList(
+      stateSchemaValidationResults: List[StateSchemaValidationResult],
+      oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = {
+
+    def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] 
= {

Review Comment:
   nit: why not name it directly, say, `getExistingSchemaFilePaths`?



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -372,6 +373,178 @@ object SchemaConverters extends Logging {
       schema
     }
   }
+
+  /**
+   * Creates default values for Spark SQL data types when converting to Avro.
+   * This ensures fields have appropriate defaults during schema evolution.
+   *
+   * This method recursively processes Spark SQL data types and generates 
corresponding
+   * default values that are compatible with Avro schema specifications. It 
handles
+   * both primitive types (like Boolean, Integer) and complex types (Arrays, 
Maps, Structs).
+   *
+   * @param dataType The Spark SQL DataType to create a default value for
+   * @return A default value appropriate for the given data type that's 
compatible with Avro
+   */
+  private def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      // Basic types
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f
+      case DoubleType => 0.0
+      case StringType => ""
+      case BinaryType => java.nio.ByteBuffer.allocate(0)
+
+      // Complex types
+      case ArrayType(elementType, _) =>
+        new java.util.ArrayList[Any]()
+      case MapType(StringType, valueType, _) =>
+        new java.util.HashMap[String, Any]()
+      case st: StructType => createNestedDefault(st)
+
+      // Special types
+      case _: DecimalType => java.nio.ByteBuffer.allocate(0)
+      case DateType => 0
+      case TimestampType => 0L
+      case TimestampNTZType => 0L
+      case NullType => null
+      case _ => null
+    }
+  }
+
+  /**

Review Comment:
   Would you mind providing an example? It does not look like we are going to 
name the column with the way Spark SQL refers to the nested column, say, 
`a.b.c`. Would be great to have a complicated Spark SQL schema and how it 
transforms to Avro.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -140,7 +140,13 @@ case class TransformWithStateExec(
    * after init is called.
    */
   override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = 
{
-    val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
+    val keySchema = keyExpressions.toStructType
+    val defaultSchema = 
StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,

Review Comment:
   Shall we put the comment about the reason we put dummy entry for default cf?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2334,6 +2341,13 @@ object SQLConf {
         "Valid values are 'unsaferow' and 'avro'")
       .createWithDefault("unsaferow")
 
+  val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD =

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -233,6 +234,33 @@ trait StateStoreWriter
     }
   }
 
+  def stateSchemaList(
+      stateSchemaValidationResults: List[StateSchemaValidationResult],
+      oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = {
+
+    def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] 
= {
+      val ssInfo = metadata.stateStoreInfo.head
+      ssInfo.stateSchemaFilePaths
+    }
+
+    val validationResult = stateSchemaValidationResults.head
+
+    oldMetadata match {
+      case Some(v2: OperatorStateMetadataV2) =>
+        val oldSchemaPaths = getExistingStateInfo(v2)
+        if (validationResult.evolvedSchema) {
+          List(oldSchemaPaths ++ List(validationResult.schemaPath))

Review Comment:
   What's the reason to make this be `List[List[String]]`? 
   
   ```
   List(List('a', 'b', 'c') ++ List('d')) = List(List('a', 'b', 'c', 'd'))
   ```
   
   I don't see the scenario where the outer List will have more than one 
element.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -140,7 +140,13 @@ case class TransformWithStateExec(
    * after init is called.
    */
   override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = 
{
-    val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
+    val keySchema = keyExpressions.toStructType
+    val defaultSchema = 
StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+      0, keyExpressions.toStructType, 0, DUMMY_VALUE_ROW_SCHEMA,

Review Comment:
   nit: ditto, named param?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -468,7 +468,8 @@ class OperatorStateMetadataV2FileManager(
     // find the batchId of the earliest schema file we need to keep
     val earliestBatchToKeep = latestMetadata match {
       case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) =>
-        val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath
+        val ssInfo = stateStoreInfo.head
+        val schemaFilePath = ssInfo.stateSchemaFilePaths.head

Review Comment:
   ditto, reasoning about simply picking the head



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -468,7 +468,8 @@ class OperatorStateMetadataV2FileManager(
     // find the batchId of the earliest schema file we need to keep
     val earliestBatchToKeep = latestMetadata match {
       case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) =>
-        val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath
+        val ssInfo = stateStoreInfo.head

Review Comment:
   Same: is it safe to simply pick the head, and if it's safe, what are other 
elements?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  def addSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey)
+      .map(_.schemaId).max
+  }
+}
+
+class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends 
StateSchemaProvider {
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    metadata.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    metadata.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Broadcasts schema metadata information for stateful operators in a 
streaming query.
+ *
+ * This class provides a way to distribute schema evolution information to all 
executors
+ * via Spark's broadcast mechanism. Each stateful operator in a streaming 
query maintains
+ * its own instance of this class to track schema versions and evolution.
+ *
+ * @param broadcast Spark broadcast variable containing the schema metadata
+ */
+case class StateSchemaBroadcast(
+    broadcast: Broadcast[StateSchemaMetadata]
+) extends Logging with StateSchemaProvider {
+
+  /**
+   * Retrieves the schema information for a given column family and schema 
version
+   *
+   * @param key A combination of column family name and schema ID
+   * @return The corresponding schema metadata value containing both SQL and 
Avro schemas
+   */
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    broadcast.value.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    broadcast.value.activeSchemas
+      .keys
+      .filter(key =>

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  def addSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey)
+      .map(_.schemaId).max
+  }
+}
+
+class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends 
StateSchemaProvider {
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    metadata.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    metadata.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Broadcasts schema metadata information for stateful operators in a 
streaming query.
+ *
+ * This class provides a way to distribute schema evolution information to all 
executors
+ * via Spark's broadcast mechanism. Each stateful operator in a streaming 
query maintains
+ * its own instance of this class to track schema versions and evolution.
+ *
+ * @param broadcast Spark broadcast variable containing the schema metadata
+ */
+case class StateSchemaBroadcast(
+    broadcast: Broadcast[StateSchemaMetadata]
+) extends Logging with StateSchemaProvider {
+
+  /**
+   * Retrieves the schema information for a given column family and schema 
version
+   *
+   * @param key A combination of column family name and schema ID
+   * @return The corresponding schema metadata value containing both SQL and 
Avro schemas
+   */
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    broadcast.value.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    broadcast.value.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Contains schema evolution metadata for a stateful operator.
+ *
+ * @param currentSchemaId The schema version currently being used for writing 
new state
+ * @param activeSchemas Map of all active schema versions, keyed by column 
family and schema ID.
+ *                      This includes both the current schema and any previous 
schemas that
+ *                      may still exist in the state store.
+ */
+case class StateSchemaMetadata(
+    activeSchemas: Map[StateSchemaMetadataKey, StateSchemaMetadataValue]
+)
+
+object StateSchemaMetadata {
+
+  def createStateSchemaMetadata(
+      checkpointLocation: String,
+      hadoopConf: Configuration,
+      stateSchemaFiles: List[String]
+  ): StateSchemaMetadata = {
+    val fm = CheckpointFileManager.create(new Path(checkpointLocation), 
hadoopConf)
+
+    // Build up our map of schema metadata
+    val activeSchemas = stateSchemaFiles.zipWithIndex.foldLeft(
+      Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue]) {
+      case (schemas, (stateSchemaFile, schemaIndex)) =>
+        val fsDataInputStream = fm.open(new Path(stateSchemaFile))
+        val colFamilySchemas = 
StateSchemaCompatibilityChecker.readSchemaFile(fsDataInputStream)
+
+        // For each column family, create metadata entries for both key and 
value schemas
+        val schemaEntries = colFamilySchemas.flatMap { colFamilySchema =>
+          // Create key schema metadata
+          val keyAvroSchema = SchemaConverters.toAvroTypeWithDefaults(
+            colFamilySchema.keySchema)
+          val keyEntry = StateSchemaMetadataKey(
+            colFamilySchema.colFamilyName,
+            colFamilySchema.keySchemaId,
+            isKey = true
+          ) -> StateSchemaMetadataValue(
+            colFamilySchema.keySchema,
+            keyAvroSchema
+          )
+
+          // Create value schema metadata
+          val valueAvroSchema = SchemaConverters.toAvroTypeWithDefaults(
+            colFamilySchema.valueSchema)
+          val valueEntry = StateSchemaMetadataKey(
+            colFamilySchema.colFamilyName,
+            colFamilySchema.valueSchemaId,
+            isKey = false
+          ) -> StateSchemaMetadataValue(
+            colFamilySchema.valueSchema,
+            valueAvroSchema
+          )
+
+          Seq(keyEntry, valueEntry)
+        }
+
+        // Add new entries to our accumulated map
+        schemas ++ schemaEntries.toMap
+    }
+
+    // Create the final metadata and wrap it in a broadcast

Review Comment:
   We don't seem to handle broadcast "here" - the comment is misleading.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  def addSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter(key =>

Review Comment:
   nit: use `{` `}` for multi-line block
   
   ```
   .filter { key =>
   
   }
   .map
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]

Review Comment:
   nit: why not using mutable collection and set this to val?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  def addSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey)
+      .map(_.schemaId).max
+  }
+}
+
+class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends 
StateSchemaProvider {
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    metadata.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    metadata.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Broadcasts schema metadata information for stateful operators in a 
streaming query.
+ *
+ * This class provides a way to distribute schema evolution information to all 
executors
+ * via Spark's broadcast mechanism. Each stateful operator in a streaming 
query maintains
+ * its own instance of this class to track schema versions and evolution.
+ *
+ * @param broadcast Spark broadcast variable containing the schema metadata
+ */
+case class StateSchemaBroadcast(
+    broadcast: Broadcast[StateSchemaMetadata]
+) extends Logging with StateSchemaProvider {
+
+  /**
+   * Retrieves the schema information for a given column family and schema 
version
+   *
+   * @param key A combination of column family name and schema ID
+   * @return The corresponding schema metadata value containing both SQL and 
Avro schemas
+   */
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    broadcast.value.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    broadcast.value.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Contains schema evolution metadata for a stateful operator.
+ *
+ * @param currentSchemaId The schema version currently being used for writing 
new state

Review Comment:
   nit: there is no param for this



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -528,15 +713,28 @@ class UnsafeRowDataEncoder(
 class AvroStateEncoder(
     keyStateEncoderSpec: KeyStateEncoderSpec,
     valueSchema: StructType,
-    stateSchemaInfo: Option[StateSchemaInfo]
+    stateSchemaProvider: Option[StateSchemaProvider],
+    columnFamilyInfo: Option[ColumnFamilyInfo]
 ) extends RocksDBDataEncoder(keyStateEncoderSpec, valueSchema) with Logging {
 
+
+  // schema information
+  private lazy val currentKeySchemaId: Short = 
getStateSchemaProvider.getCurrentStateSchemaId(
+    getColFamilyName,
+    isKey = true
+  )
+
+  private lazy val currentValSchemaId: Short = 
getStateSchemaProvider.getCurrentStateSchemaId(
+    getColFamilyName,
+    isKey = false
+  )
+
   private val avroEncoder = createAvroEnc(keyStateEncoderSpec, valueSchema)

Review Comment:
   nit: I understand you are putting fields which are relevant. But here we 
have so many lazy values and it's uneasy to figure out which fields are 
initialized during instantiation, and which fields aren't.
   
   Shall we move non-lazy val to the top, or bottom of lazy val?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -528,15 +713,28 @@ class UnsafeRowDataEncoder(
 class AvroStateEncoder(
     keyStateEncoderSpec: KeyStateEncoderSpec,
     valueSchema: StructType,
-    stateSchemaInfo: Option[StateSchemaInfo]
+    stateSchemaProvider: Option[StateSchemaProvider],
+    columnFamilyInfo: Option[ColumnFamilyInfo]
 ) extends RocksDBDataEncoder(keyStateEncoderSpec, valueSchema) with Logging {
 
+

Review Comment:
   nit: unnecessary empty line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  def addSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey)
+      .map(_.schemaId).max
+  }
+}
+
+class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends 
StateSchemaProvider {
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    metadata.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    metadata.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Broadcasts schema metadata information for stateful operators in a 
streaming query.
+ *
+ * This class provides a way to distribute schema evolution information to all 
executors
+ * via Spark's broadcast mechanism. Each stateful operator in a streaming 
query maintains
+ * its own instance of this class to track schema versions and evolution.
+ *
+ * @param broadcast Spark broadcast variable containing the schema metadata
+ */
+case class StateSchemaBroadcast(
+    broadcast: Broadcast[StateSchemaMetadata]
+) extends Logging with StateSchemaProvider {
+
+  /**
+   * Retrieves the schema information for a given column family and schema 
version
+   *
+   * @param key A combination of column family name and schema ID
+   * @return The corresponding schema metadata value containing both SQL and 
Avro schemas
+   */
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    broadcast.value.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    broadcast.value.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Contains schema evolution metadata for a stateful operator.
+ *
+ * @param currentSchemaId The schema version currently being used for writing 
new state
+ * @param activeSchemas Map of all active schema versions, keyed by column 
family and schema ID.
+ *                      This includes both the current schema and any previous 
schemas that
+ *                      may still exist in the state store.
+ */
+case class StateSchemaMetadata(
+    activeSchemas: Map[StateSchemaMetadataKey, StateSchemaMetadataValue]
+)
+
+object StateSchemaMetadata {
+
+  def createStateSchemaMetadata(
+      checkpointLocation: String,
+      hadoopConf: Configuration,
+      stateSchemaFiles: List[String]
+  ): StateSchemaMetadata = {
+    val fm = CheckpointFileManager.create(new Path(checkpointLocation), 
hadoopConf)
+
+    // Build up our map of schema metadata
+    val activeSchemas = stateSchemaFiles.zipWithIndex.foldLeft(
+      Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue]) {
+      case (schemas, (stateSchemaFile, schemaIndex)) =>

Review Comment:
   nit: index does not seem to be used



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -297,7 +481,8 @@ abstract class RocksDBDataEncoder(
 class UnsafeRowDataEncoder(
     keyStateEncoderSpec: KeyStateEncoderSpec,
     valueSchema: StructType,
-    stateSchemaInfo: Option[StateSchemaInfo]
+    stateSchemaProvider: Option[StateSchemaProvider],

Review Comment:
   I see these two aren't used. Unless we create the instance dynamically, I'm 
not sure there is a merit to make the params be consistent.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -189,7 +373,7 @@ trait DataEncoder {
 
 abstract class RocksDBDataEncoder(
     keyStateEncoderSpec: KeyStateEncoderSpec,
-    valueSchema: StructType) extends DataEncoder {
+    valueSchema: StructType) extends DataEncoder with Logging {

Review Comment:
   Is this used?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1007,8 +1248,19 @@ class AvroStateEncoder(
 
   override def decodeValue(bytes: Array[Byte]): UnsafeRow = {
     val schemaIdRow = decodeStateSchemaIdRow(bytes)
+    val writerSchema = stateSchemaProvider.get.getSchemaMetadataValue(

Review Comment:
   Is it safe? If we are very sure stateSchemaProvider is Some(x), then is 
there a reason we put this as Option? If we have to put this as Option but we 
should always see this as Some(x), shall we add assert to raise an exception 
with better message?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -196,8 +196,8 @@ trait FlatMapGroupsWithStateExecBase
       hadoopConf: Configuration,
       batchId: Long,
       stateSchemaVersion: Int): List[StateSchemaValidationResult] = {
-    val newStateSchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
-      groupingAttributes.toStructType, stateManager.stateSchema))
+    val newStateSchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0,

Review Comment:
   ditto about named parameter



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -231,11 +234,17 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
           s"for state variable $stateVarName in operator 
${sourceOptions.operatorId}")
         val stateVarInfo = stateVarInfoList.head
         transformWithStateVariableInfoOpt = Some(stateVarInfo)
-        val schemaFilePath = new 
Path(storeMetadataEntry.stateSchemaFilePath.get)
-        Some(schemaFilePath)
+        schemaFilePaths = storeMetadataEntry.stateSchemaFilePaths
+        val stateSchemaMetadata = 
StateSchemaMetadata.createStateSchemaMetadata(
+          sourceOptions.stateCheckpointLocation.toString,
+          hadoopConf,
+          schemaFilePaths
+        )
+        stateSchemaProvider = Some(new 
InMemoryStateSchemaProvider(stateSchemaMetadata))

Review Comment:
   Just to double confirm, is the reason we use InMemoryStateSchemaProvider 
rather than broadcast because it's short live query?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -231,11 +234,17 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
           s"for state variable $stateVarName in operator 
${sourceOptions.operatorId}")
         val stateVarInfo = stateVarInfoList.head
         transformWithStateVariableInfoOpt = Some(stateVarInfo)
-        val schemaFilePath = new 
Path(storeMetadataEntry.stateSchemaFilePath.get)
-        Some(schemaFilePath)
+        schemaFilePaths = storeMetadataEntry.stateSchemaFilePaths

Review Comment:
   nit: This does not need to be outer scope. It's not used outer this scope.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala:
##########


Review Comment:
   ditto about named params



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -402,15 +419,27 @@ private[sql] class RocksDBStateStoreProvider
       stateStoreName = stateStoreId.storeName,
       colFamilyName = StateStore.DEFAULT_COL_FAMILY_NAME)
 
-    val dataEncoder = getDataEncoder(
-      stateStoreEncoding, dataEncoderCacheKey, keyStateEncoderSpec, 
valueSchema)
+    // For test cases only: TestStateSchemaProvider allows dynamically adding 
schemas
+    // during test execution to verify schema evolution behavior. In 
production,
+    // schemas are loaded from checkpoint data
+    stateSchemaProvider match {
+      case Some(t: TestStateSchemaProvider) =>

Review Comment:
   Also, this is not even for e2e test, right? Since constructing streaming 
query won't give us a chance to put this schema provider. Probably better to 
scope it down when explaining, as I had to consider the scenario of distributed 
execution.



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -372,6 +373,178 @@ object SchemaConverters extends Logging {
       schema
     }
   }
+
+  /**
+   * Creates default values for Spark SQL data types when converting to Avro.
+   * This ensures fields have appropriate defaults during schema evolution.
+   *
+   * This method recursively processes Spark SQL data types and generates 
corresponding
+   * default values that are compatible with Avro schema specifications. It 
handles
+   * both primitive types (like Boolean, Integer) and complex types (Arrays, 
Maps, Structs).
+   *
+   * @param dataType The Spark SQL DataType to create a default value for
+   * @return A default value appropriate for the given data type that's 
compatible with Avro
+   */
+  private def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      // Basic types
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f

Review Comment:
   Let's simply go through the path of adding column in existing table for 
RDBMS. Either you have to set the new column as nullable, or you have to 
provide the default value explicitly. Let's not trying to define the default 
value by ourselves.
   
   That said, unless we are going to provide the UX to let user specify the 
default values for new columns, new columns must be nullable, which is 
interpreted as "optional" in Avro.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -58,7 +64,8 @@ class StateScan(
     stateStoreConf: StateStoreConf,
     keyStateEncoderSpec: KeyStateEncoderSpec,
     stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
-    stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]) extends 
Scan
+    stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
+    stateSchemaProviderOpt: Option[StateSchemaProvider]) extends Scan

Review Comment:
   nit: place `extends Scan` in below line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -402,15 +419,27 @@ private[sql] class RocksDBStateStoreProvider
       stateStoreName = stateStoreId.storeName,
       colFamilyName = StateStore.DEFAULT_COL_FAMILY_NAME)
 
-    val dataEncoder = getDataEncoder(
-      stateStoreEncoding, dataEncoderCacheKey, keyStateEncoderSpec, 
valueSchema)
+    // For test cases only: TestStateSchemaProvider allows dynamically adding 
schemas
+    // during test execution to verify schema evolution behavior. In 
production,
+    // schemas are loaded from checkpoint data
+    stateSchemaProvider match {
+      case Some(t: TestStateSchemaProvider) =>

Review Comment:
   I can't imagine the way, except having an interface to perform callback. I 
agree we don't like to make the code complicated based on test stuff, but 
adding interface to do callback is more complicated. I'm OK with this code.
   
   Though probably TestStateSchemaProvider and addSchema() is too generic and 
unclear what it does. Either class doc & method doc to explain, or better 
naming. e.g. It sounds like "capturing" the state schema.



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -372,6 +373,178 @@ object SchemaConverters extends Logging {
       schema
     }
   }
+
+  /**
+   * Creates default values for Spark SQL data types when converting to Avro.
+   * This ensures fields have appropriate defaults during schema evolution.
+   *
+   * This method recursively processes Spark SQL data types and generates 
corresponding
+   * default values that are compatible with Avro schema specifications. It 
handles
+   * both primitive types (like Boolean, Integer) and complex types (Arrays, 
Maps, Structs).
+   *
+   * @param dataType The Spark SQL DataType to create a default value for
+   * @return A default value appropriate for the given data type that's 
compatible with Avro
+   */
+  private def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      // Basic types
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f

Review Comment:
   > although - if we do try to rewrite in the future, the state store layer 
wont allow us to write nulls ?
   
   This is going to be a value for a column, not a row itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to