HeartSaVioR commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1909826111
########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,178 @@ object SchemaConverters extends Logging { schema } } + + /** + * Creates default values for Spark SQL data types when converting to Avro. + * This ensures fields have appropriate defaults during schema evolution. + * + * This method recursively processes Spark SQL data types and generates corresponding + * default values that are compatible with Avro schema specifications. It handles + * both primitive types (like Boolean, Integer) and complex types (Arrays, Maps, Structs). + * + * @param dataType The Spark SQL DataType to create a default value for + * @return A default value appropriate for the given data type that's compatible with Avro + */ + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + defaultMap.put(field.name, getDefaultValue(field.dataType)) + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => false + case ByteType | ShortType | IntegerType => 0 + case LongType => 0L + case FloatType => 0.0f + case DoubleType => 0.0 + case StringType => "" + case BinaryType => java.nio.ByteBuffer.allocate(0) + + // Complex types + case ArrayType(elementType, _) => + new java.util.ArrayList[Any]() + case MapType(StringType, valueType, _) => + new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => 0 + case TimestampType => 0L + case TimestampNTZType => 0L + case NullType => null + case _ => null + } + } + + /** + * Converts a Spark SQL schema to a corresponding Avro schema. + * This method provides comprehensive support for schema evolution and handles + * complex nested types while maintaining type safety and compatibility. + * + * The conversion process includes: + * - Converting primitive Spark SQL types to Avro types + * - Handling complex types (arrays, maps, structs) with proper nesting + * - Supporting nullable fields through Avro unions + * - Managing logical types for dates, timestamps, and decimals + * - Generating unique names for nested records + * - Preserving namespace hierarchy for nested structures + * + * @param catalystType The Spark SQL DataType to convert + * @param nullable Whether the field can contain null values + * @param recordName The name to use for the record in the Avro schema + * @param namespace The namespace for the Avro schema + * @param nestingLevel Current nesting level for generating unique names + * @return An Avro Schema corresponding to the input Spark SQL type + * @throws IncompatibleSchemaException if the input type cannot be converted to Avro + */ + def toAvroTypeWithDefaults( + catalystType: DataType, + nullable: Boolean = false, Review Comment: Is there a case where the caller (outer, not recursive call) will call this method with non-struct type? If there is no case, it'd be more clear to only allow StructType in public method and make this method be private or nested method in the new public method, because DataType itself has no concept of being nullable and caller could be confusing about this param. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2323,6 +2323,13 @@ object SQLConf { .intConf .createWithDefault(1) + val STREAMING_STATE_SCHEMA_FILES_THRESHOLD = Review Comment: nit: let's think more on whether we want to use more direct config name e.g. `maxNumFiles`. I see both patterns exist in the config so threshold may be acceptable. I just feel like making the config name more direct would be easier to understand. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -960,8 +991,8 @@ case class SessionWindowStateStoreSaveExec( override def validateAndMaybeEvolveStateSchema( hadoopConf: Configuration, batchId: Long, stateSchemaVersion: Int): List[StateSchemaValidationResult] = { - val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, - stateManager.getStateKeySchema, stateManager.getStateValueSchema)) + val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0, Review Comment: nit: shall we make this be consistent? Either remove named params in above or put named params here and below. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ########## @@ -263,8 +263,8 @@ case class StreamingSymmetricHashJoinExec( // validate and maybe evolve schema for all state stores across both sides of the join result.map { case (stateStoreName, (keySchema, valueSchema)) => - val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, - keySchema, valueSchema)) + val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0, Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -82,11 +82,26 @@ private[sql] class RocksDBStateStoreProvider stateStoreName = stateStoreId.storeName, colFamilyName = colFamilyName) - val dataEncoder = getDataEncoder( - stateStoreEncoding, dataEncoderCacheKey, keyStateEncoderSpec, valueSchema) - val columnFamilyInfo = Some(ColumnFamilyInfo(colFamilyName, newColFamilyId)) + // For test cases only: TestStateSchemaProvider allows dynamically adding schemas + // during test execution to verify schema evolution behavior. In production, + // schemas are loaded from checkpoint data + stateSchemaProvider match { + case Some(t: TestStateSchemaProvider) => + t.addSchema(colFamilyName, keySchema, valueSchema) Review Comment: somehow left the comment below, ditto here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + def addSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey) + .map(_.schemaId).max + } +} + +class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends StateSchemaProvider { + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + metadata.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + metadata.activeSchemas + .keys + .filter(key => Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala: ########## @@ -26,19 +26,25 @@ import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil import org.apache.spark.sql.execution.streaming.TransformWithStateVariableInfo -import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStoreColFamilySchema, StateStoreConf} +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateSchemaProvider, StateStoreColFamilySchema, StateStoreConf} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -/** An implementation of [[Table]] with [[SupportsRead]] for State Store data source. */ +/** + * An implementation of [[Table]] with [[SupportsRead]] for State Store data source. + * @param stateSchemaProviderOpt Optional provider that maintains mapping between schema IDs and Review Comment: One question: what would be the schema of the state when we do time-travel? And what would be the schema of the state when we read from changelog in version range? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2334,6 +2341,13 @@ object SQLConf { "Valid values are 'unsaferow' and 'avro'") .createWithDefault("unsaferow") + val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD = Review Comment: Btw, any reason we have to restrict this? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -233,6 +234,33 @@ trait StateStoreWriter } } + def stateSchemaList( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = { + + def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] = { + val ssInfo = metadata.stateStoreInfo.head + ssInfo.stateSchemaFilePaths + } + + val validationResult = stateSchemaValidationResults.head Review Comment: I forgot about the detail, but it's a bit odd to just pick the head. It sounds to me the param does not need to be a list. Do we have other case where the validation results are multiple, and if then is it safe for us to pick the head here? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -233,6 +234,33 @@ trait StateStoreWriter } } + def stateSchemaList( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = { + + def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] = { Review Comment: nit: why not name it directly, say, `getExistingSchemaFilePaths`? ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,178 @@ object SchemaConverters extends Logging { schema } } + + /** + * Creates default values for Spark SQL data types when converting to Avro. + * This ensures fields have appropriate defaults during schema evolution. + * + * This method recursively processes Spark SQL data types and generates corresponding + * default values that are compatible with Avro schema specifications. It handles + * both primitive types (like Boolean, Integer) and complex types (Arrays, Maps, Structs). + * + * @param dataType The Spark SQL DataType to create a default value for + * @return A default value appropriate for the given data type that's compatible with Avro + */ + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + defaultMap.put(field.name, getDefaultValue(field.dataType)) + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => false + case ByteType | ShortType | IntegerType => 0 + case LongType => 0L + case FloatType => 0.0f + case DoubleType => 0.0 + case StringType => "" + case BinaryType => java.nio.ByteBuffer.allocate(0) + + // Complex types + case ArrayType(elementType, _) => + new java.util.ArrayList[Any]() + case MapType(StringType, valueType, _) => + new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => 0 + case TimestampType => 0L + case TimestampNTZType => 0L + case NullType => null + case _ => null + } + } + + /** Review Comment: Would you mind providing an example? It does not look like we are going to name the column with the way Spark SQL refers to the nested column, say, `a.b.c`. Would be great to have a complicated Spark SQL schema and how it transforms to Avro. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -140,7 +140,13 @@ case class TransformWithStateExec( * after init is called. */ override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { - val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas + val keySchema = keyExpressions.toStructType + val defaultSchema = StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, Review Comment: Shall we put the comment about the reason we put dummy entry for default cf? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2334,6 +2341,13 @@ object SQLConf { "Valid values are 'unsaferow' and 'avro'") .createWithDefault("unsaferow") + val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD = Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -233,6 +234,33 @@ trait StateStoreWriter } } + def stateSchemaList( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = { + + def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] = { + val ssInfo = metadata.stateStoreInfo.head + ssInfo.stateSchemaFilePaths + } + + val validationResult = stateSchemaValidationResults.head + + oldMetadata match { + case Some(v2: OperatorStateMetadataV2) => + val oldSchemaPaths = getExistingStateInfo(v2) + if (validationResult.evolvedSchema) { + List(oldSchemaPaths ++ List(validationResult.schemaPath)) Review Comment: What's the reason to make this be `List[List[String]]`? ``` List(List('a', 'b', 'c') ++ List('d')) = List(List('a', 'b', 'c', 'd')) ``` I don't see the scenario where the outer List will have more than one element. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -140,7 +140,13 @@ case class TransformWithStateExec( * after init is called. */ override def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { - val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas + val keySchema = keyExpressions.toStructType + val defaultSchema = StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, + 0, keyExpressions.toStructType, 0, DUMMY_VALUE_ROW_SCHEMA, Review Comment: nit: ditto, named param? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -468,7 +468,8 @@ class OperatorStateMetadataV2FileManager( // find the batchId of the earliest schema file we need to keep val earliestBatchToKeep = latestMetadata match { case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) => - val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath + val ssInfo = stateStoreInfo.head + val schemaFilePath = ssInfo.stateSchemaFilePaths.head Review Comment: ditto, reasoning about simply picking the head ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -468,7 +468,8 @@ class OperatorStateMetadataV2FileManager( // find the batchId of the earliest schema file we need to keep val earliestBatchToKeep = latestMetadata match { case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) => - val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath + val ssInfo = stateStoreInfo.head Review Comment: Same: is it safe to simply pick the head, and if it's safe, what are other elements? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + def addSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey) + .map(_.schemaId).max + } +} + +class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends StateSchemaProvider { + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + metadata.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + metadata.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Broadcasts schema metadata information for stateful operators in a streaming query. + * + * This class provides a way to distribute schema evolution information to all executors + * via Spark's broadcast mechanism. Each stateful operator in a streaming query maintains + * its own instance of this class to track schema versions and evolution. + * + * @param broadcast Spark broadcast variable containing the schema metadata + */ +case class StateSchemaBroadcast( + broadcast: Broadcast[StateSchemaMetadata] +) extends Logging with StateSchemaProvider { + + /** + * Retrieves the schema information for a given column family and schema version + * + * @param key A combination of column family name and schema ID + * @return The corresponding schema metadata value containing both SQL and Avro schemas + */ + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + broadcast.value.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + broadcast.value.activeSchemas + .keys + .filter(key => Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + def addSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey) + .map(_.schemaId).max + } +} + +class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends StateSchemaProvider { + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + metadata.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + metadata.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Broadcasts schema metadata information for stateful operators in a streaming query. + * + * This class provides a way to distribute schema evolution information to all executors + * via Spark's broadcast mechanism. Each stateful operator in a streaming query maintains + * its own instance of this class to track schema versions and evolution. + * + * @param broadcast Spark broadcast variable containing the schema metadata + */ +case class StateSchemaBroadcast( + broadcast: Broadcast[StateSchemaMetadata] +) extends Logging with StateSchemaProvider { + + /** + * Retrieves the schema information for a given column family and schema version + * + * @param key A combination of column family name and schema ID + * @return The corresponding schema metadata value containing both SQL and Avro schemas + */ + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + broadcast.value.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + broadcast.value.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Contains schema evolution metadata for a stateful operator. + * + * @param currentSchemaId The schema version currently being used for writing new state + * @param activeSchemas Map of all active schema versions, keyed by column family and schema ID. + * This includes both the current schema and any previous schemas that + * may still exist in the state store. + */ +case class StateSchemaMetadata( + activeSchemas: Map[StateSchemaMetadataKey, StateSchemaMetadataValue] +) + +object StateSchemaMetadata { + + def createStateSchemaMetadata( + checkpointLocation: String, + hadoopConf: Configuration, + stateSchemaFiles: List[String] + ): StateSchemaMetadata = { + val fm = CheckpointFileManager.create(new Path(checkpointLocation), hadoopConf) + + // Build up our map of schema metadata + val activeSchemas = stateSchemaFiles.zipWithIndex.foldLeft( + Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue]) { + case (schemas, (stateSchemaFile, schemaIndex)) => + val fsDataInputStream = fm.open(new Path(stateSchemaFile)) + val colFamilySchemas = StateSchemaCompatibilityChecker.readSchemaFile(fsDataInputStream) + + // For each column family, create metadata entries for both key and value schemas + val schemaEntries = colFamilySchemas.flatMap { colFamilySchema => + // Create key schema metadata + val keyAvroSchema = SchemaConverters.toAvroTypeWithDefaults( + colFamilySchema.keySchema) + val keyEntry = StateSchemaMetadataKey( + colFamilySchema.colFamilyName, + colFamilySchema.keySchemaId, + isKey = true + ) -> StateSchemaMetadataValue( + colFamilySchema.keySchema, + keyAvroSchema + ) + + // Create value schema metadata + val valueAvroSchema = SchemaConverters.toAvroTypeWithDefaults( + colFamilySchema.valueSchema) + val valueEntry = StateSchemaMetadataKey( + colFamilySchema.colFamilyName, + colFamilySchema.valueSchemaId, + isKey = false + ) -> StateSchemaMetadataValue( + colFamilySchema.valueSchema, + valueAvroSchema + ) + + Seq(keyEntry, valueEntry) + } + + // Add new entries to our accumulated map + schemas ++ schemaEntries.toMap + } + + // Create the final metadata and wrap it in a broadcast Review Comment: We don't seem to handle broadcast "here" - the comment is misleading. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + def addSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter(key => Review Comment: nit: use `{` `}` for multi-line block ``` .filter { key => } .map ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] Review Comment: nit: why not using mutable collection and set this to val? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + def addSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey) + .map(_.schemaId).max + } +} + +class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends StateSchemaProvider { + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + metadata.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + metadata.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Broadcasts schema metadata information for stateful operators in a streaming query. + * + * This class provides a way to distribute schema evolution information to all executors + * via Spark's broadcast mechanism. Each stateful operator in a streaming query maintains + * its own instance of this class to track schema versions and evolution. + * + * @param broadcast Spark broadcast variable containing the schema metadata + */ +case class StateSchemaBroadcast( + broadcast: Broadcast[StateSchemaMetadata] +) extends Logging with StateSchemaProvider { + + /** + * Retrieves the schema information for a given column family and schema version + * + * @param key A combination of column family name and schema ID + * @return The corresponding schema metadata value containing both SQL and Avro schemas + */ + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + broadcast.value.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + broadcast.value.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Contains schema evolution metadata for a stateful operator. + * + * @param currentSchemaId The schema version currently being used for writing new state Review Comment: nit: there is no param for this ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -528,15 +713,28 @@ class UnsafeRowDataEncoder( class AvroStateEncoder( keyStateEncoderSpec: KeyStateEncoderSpec, valueSchema: StructType, - stateSchemaInfo: Option[StateSchemaInfo] + stateSchemaProvider: Option[StateSchemaProvider], + columnFamilyInfo: Option[ColumnFamilyInfo] ) extends RocksDBDataEncoder(keyStateEncoderSpec, valueSchema) with Logging { + + // schema information + private lazy val currentKeySchemaId: Short = getStateSchemaProvider.getCurrentStateSchemaId( + getColFamilyName, + isKey = true + ) + + private lazy val currentValSchemaId: Short = getStateSchemaProvider.getCurrentStateSchemaId( + getColFamilyName, + isKey = false + ) + private val avroEncoder = createAvroEnc(keyStateEncoderSpec, valueSchema) Review Comment: nit: I understand you are putting fields which are relevant. But here we have so many lazy values and it's uneasy to figure out which fields are initialized during instantiation, and which fields aren't. Shall we move non-lazy val to the top, or bottom of lazy val? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -528,15 +713,28 @@ class UnsafeRowDataEncoder( class AvroStateEncoder( keyStateEncoderSpec: KeyStateEncoderSpec, valueSchema: StructType, - stateSchemaInfo: Option[StateSchemaInfo] + stateSchemaProvider: Option[StateSchemaProvider], + columnFamilyInfo: Option[ColumnFamilyInfo] ) extends RocksDBDataEncoder(keyStateEncoderSpec, valueSchema) with Logging { + Review Comment: nit: unnecessary empty line ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + def addSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey) + .map(_.schemaId).max + } +} + +class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends StateSchemaProvider { + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + metadata.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + metadata.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Broadcasts schema metadata information for stateful operators in a streaming query. + * + * This class provides a way to distribute schema evolution information to all executors + * via Spark's broadcast mechanism. Each stateful operator in a streaming query maintains + * its own instance of this class to track schema versions and evolution. + * + * @param broadcast Spark broadcast variable containing the schema metadata + */ +case class StateSchemaBroadcast( + broadcast: Broadcast[StateSchemaMetadata] +) extends Logging with StateSchemaProvider { + + /** + * Retrieves the schema information for a given column family and schema version + * + * @param key A combination of column family name and schema ID + * @return The corresponding schema metadata value containing both SQL and Avro schemas + */ + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + broadcast.value.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + broadcast.value.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Contains schema evolution metadata for a stateful operator. + * + * @param currentSchemaId The schema version currently being used for writing new state + * @param activeSchemas Map of all active schema versions, keyed by column family and schema ID. + * This includes both the current schema and any previous schemas that + * may still exist in the state store. + */ +case class StateSchemaMetadata( + activeSchemas: Map[StateSchemaMetadataKey, StateSchemaMetadataValue] +) + +object StateSchemaMetadata { + + def createStateSchemaMetadata( + checkpointLocation: String, + hadoopConf: Configuration, + stateSchemaFiles: List[String] + ): StateSchemaMetadata = { + val fm = CheckpointFileManager.create(new Path(checkpointLocation), hadoopConf) + + // Build up our map of schema metadata + val activeSchemas = stateSchemaFiles.zipWithIndex.foldLeft( + Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue]) { + case (schemas, (stateSchemaFile, schemaIndex)) => Review Comment: nit: index does not seem to be used ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -297,7 +481,8 @@ abstract class RocksDBDataEncoder( class UnsafeRowDataEncoder( keyStateEncoderSpec: KeyStateEncoderSpec, valueSchema: StructType, - stateSchemaInfo: Option[StateSchemaInfo] + stateSchemaProvider: Option[StateSchemaProvider], Review Comment: I see these two aren't used. Unless we create the instance dynamically, I'm not sure there is a merit to make the params be consistent. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -189,7 +373,7 @@ trait DataEncoder { abstract class RocksDBDataEncoder( keyStateEncoderSpec: KeyStateEncoderSpec, - valueSchema: StructType) extends DataEncoder { + valueSchema: StructType) extends DataEncoder with Logging { Review Comment: Is this used? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -1007,8 +1248,19 @@ class AvroStateEncoder( override def decodeValue(bytes: Array[Byte]): UnsafeRow = { val schemaIdRow = decodeStateSchemaIdRow(bytes) + val writerSchema = stateSchemaProvider.get.getSchemaMetadataValue( Review Comment: Is it safe? If we are very sure stateSchemaProvider is Some(x), then is there a reason we put this as Option? If we have to put this as Option but we should always see this as Some(x), shall we add assert to raise an exception with better message? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala: ########## @@ -196,8 +196,8 @@ trait FlatMapGroupsWithStateExecBase hadoopConf: Configuration, batchId: Long, stateSchemaVersion: Int): List[StateSchemaValidationResult] = { - val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, - groupingAttributes.toStructType, stateManager.stateSchema)) + val newStateSchema = List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME, 0, Review Comment: ditto about named parameter ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala: ########## @@ -231,11 +234,17 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging s"for state variable $stateVarName in operator ${sourceOptions.operatorId}") val stateVarInfo = stateVarInfoList.head transformWithStateVariableInfoOpt = Some(stateVarInfo) - val schemaFilePath = new Path(storeMetadataEntry.stateSchemaFilePath.get) - Some(schemaFilePath) + schemaFilePaths = storeMetadataEntry.stateSchemaFilePaths + val stateSchemaMetadata = StateSchemaMetadata.createStateSchemaMetadata( + sourceOptions.stateCheckpointLocation.toString, + hadoopConf, + schemaFilePaths + ) + stateSchemaProvider = Some(new InMemoryStateSchemaProvider(stateSchemaMetadata)) Review Comment: Just to double confirm, is the reason we use InMemoryStateSchemaProvider rather than broadcast because it's short live query? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala: ########## @@ -231,11 +234,17 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging s"for state variable $stateVarName in operator ${sourceOptions.operatorId}") val stateVarInfo = stateVarInfoList.head transformWithStateVariableInfoOpt = Some(stateVarInfo) - val schemaFilePath = new Path(storeMetadataEntry.stateSchemaFilePath.get) - Some(schemaFilePath) + schemaFilePaths = storeMetadataEntry.stateSchemaFilePaths Review Comment: nit: This does not need to be outer scope. It's not used outer this scope. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala: ########## Review Comment: ditto about named params ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -402,15 +419,27 @@ private[sql] class RocksDBStateStoreProvider stateStoreName = stateStoreId.storeName, colFamilyName = StateStore.DEFAULT_COL_FAMILY_NAME) - val dataEncoder = getDataEncoder( - stateStoreEncoding, dataEncoderCacheKey, keyStateEncoderSpec, valueSchema) + // For test cases only: TestStateSchemaProvider allows dynamically adding schemas + // during test execution to verify schema evolution behavior. In production, + // schemas are loaded from checkpoint data + stateSchemaProvider match { + case Some(t: TestStateSchemaProvider) => Review Comment: Also, this is not even for e2e test, right? Since constructing streaming query won't give us a chance to put this schema provider. Probably better to scope it down when explaining, as I had to consider the scenario of distributed execution. ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,178 @@ object SchemaConverters extends Logging { schema } } + + /** + * Creates default values for Spark SQL data types when converting to Avro. + * This ensures fields have appropriate defaults during schema evolution. + * + * This method recursively processes Spark SQL data types and generates corresponding + * default values that are compatible with Avro schema specifications. It handles + * both primitive types (like Boolean, Integer) and complex types (Arrays, Maps, Structs). + * + * @param dataType The Spark SQL DataType to create a default value for + * @return A default value appropriate for the given data type that's compatible with Avro + */ + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + defaultMap.put(field.name, getDefaultValue(field.dataType)) + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => false + case ByteType | ShortType | IntegerType => 0 + case LongType => 0L + case FloatType => 0.0f Review Comment: Let's simply go through the path of adding column in existing table for RDBMS. Either you have to set the new column as nullable, or you have to provide the default value explicitly. Let's not trying to define the default value by ourselves. That said, unless we are going to provide the UX to let user specify the default values for new columns, new columns must be nullable, which is interpreted as "optional" in Avro. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala: ########## @@ -58,7 +64,8 @@ class StateScan( stateStoreConf: StateStoreConf, keyStateEncoderSpec: KeyStateEncoderSpec, stateVariableInfoOpt: Option[TransformWithStateVariableInfo], - stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema]) extends Scan + stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema], + stateSchemaProviderOpt: Option[StateSchemaProvider]) extends Scan Review Comment: nit: place `extends Scan` in below line ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -402,15 +419,27 @@ private[sql] class RocksDBStateStoreProvider stateStoreName = stateStoreId.storeName, colFamilyName = StateStore.DEFAULT_COL_FAMILY_NAME) - val dataEncoder = getDataEncoder( - stateStoreEncoding, dataEncoderCacheKey, keyStateEncoderSpec, valueSchema) + // For test cases only: TestStateSchemaProvider allows dynamically adding schemas + // during test execution to verify schema evolution behavior. In production, + // schemas are loaded from checkpoint data + stateSchemaProvider match { + case Some(t: TestStateSchemaProvider) => Review Comment: I can't imagine the way, except having an interface to perform callback. I agree we don't like to make the code complicated based on test stuff, but adding interface to do callback is more complicated. I'm OK with this code. Though probably TestStateSchemaProvider and addSchema() is too generic and unclear what it does. Either class doc & method doc to explain, or better naming. e.g. It sounds like "capturing" the state schema. ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,178 @@ object SchemaConverters extends Logging { schema } } + + /** + * Creates default values for Spark SQL data types when converting to Avro. + * This ensures fields have appropriate defaults during schema evolution. + * + * This method recursively processes Spark SQL data types and generates corresponding + * default values that are compatible with Avro schema specifications. It handles + * both primitive types (like Boolean, Integer) and complex types (Arrays, Maps, Structs). + * + * @param dataType The Spark SQL DataType to create a default value for + * @return A default value appropriate for the given data type that's compatible with Avro + */ + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + defaultMap.put(field.name, getDefaultValue(field.dataType)) + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => false + case ByteType | ShortType | IntegerType => 0 + case LongType => 0L + case FloatType => 0.0f Review Comment: > although - if we do try to rewrite in the future, the state store layer wont allow us to write nulls ? This is going to be a value for a column, not a row itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org