HeartSaVioR commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1914268373
########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,153 @@ object SchemaConverters extends Logging { schema } } + + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + // For nullable fields in a struct, create a wrapped default value + val fieldDefault = getDefaultValue(field.dataType) + if (field.nullable) { + defaultMap.put(field.name, null) + } else { + defaultMap.put(field.name, fieldDefault) + } + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => null + case ByteType | ShortType | IntegerType => null + case LongType => null + case FloatType => null + case DoubleType => null + case StringType => null + case BinaryType => null + + // Complex types + case ArrayType(_, _) => new java.util.ArrayList[Any]() + case MapType(StringType, _, _) => new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => null + case TimestampType => null + case TimestampNTZType => null + case NullType => null + case _ => null + } + } + + def toAvroTypeWithDefaults( + catalystType: DataType, + nullable: Boolean = false, + recordName: String = "topLevelRecord", + namespace: String = "", + nestingLevel: Int = 0): Schema = { + if (nestingLevel == 0) { + assert(catalystType.isInstanceOf[StructType], + "toAvroTypeWithDefaults should only be called with StructType") + } + val builder = SchemaBuilder.builder() + + def getNestedRecordName(baseName: String): String = { Review Comment: Does the name need to be unique across all columns? If this method aims to do so, I'm not sure this scenario is covered - `c` in `a.b.c` and `b.b.c`. If the name does not need to be unique, any reason we make the custom name? ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,153 @@ object SchemaConverters extends Logging { schema } } + + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + // For nullable fields in a struct, create a wrapped default value + val fieldDefault = getDefaultValue(field.dataType) + if (field.nullable) { + defaultMap.put(field.name, null) + } else { + defaultMap.put(field.name, fieldDefault) + } + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => null + case ByteType | ShortType | IntegerType => null + case LongType => null + case FloatType => null + case DoubleType => null + case StringType => null + case BinaryType => null + + // Complex types + case ArrayType(_, _) => new java.util.ArrayList[Any]() + case MapType(StringType, _, _) => new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => null + case TimestampType => null + case TimestampNTZType => null + case NullType => null + case _ => null + } + } + + def toAvroTypeWithDefaults( + catalystType: DataType, + nullable: Boolean = false, + recordName: String = "topLevelRecord", + namespace: String = "", + nestingLevel: Int = 0): Schema = { + if (nestingLevel == 0) { + assert(catalystType.isInstanceOf[StructType], + "toAvroTypeWithDefaults should only be called with StructType") + } + val builder = SchemaBuilder.builder() + + def getNestedRecordName(baseName: String): String = { + if (nestingLevel == 0) baseName + else s"${baseName}_nested_$nestingLevel" + } + + def processStructFields( + st: StructType, + fieldsAssembler: FieldAssembler[Schema]): FieldAssembler[Schema] = { + st.foreach { field => + val isLeafType = field.dataType match { + case _: StructType | _: ArrayType | _: MapType => false + case _ => true + } + + val innerType = toAvroTypeWithDefaults( + field.dataType, + nullable = isLeafType, // Only make leaf types nullable + getNestedRecordName(field.name), + namespace, + nestingLevel + 1 + ) + + // We want structs to have their proper defaults + val defaultValue = field.dataType match { + case _: StructType => getDefaultValue(field.dataType) // Use struct default + case _ => null // Leaf fields get null default + } + + fieldsAssembler.name(field.name).`type`(innerType).withDefault(defaultValue) + } + fieldsAssembler + } + + val baseSchema = catalystType match { + case st: StructType => + val nestedRecordName = getNestedRecordName(recordName) + val childNameSpace = if (namespace != "") { Review Comment: This is not used. ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,153 @@ object SchemaConverters extends Logging { schema } } + + private def getDefaultValue(dataType: DataType): Any = { Review Comment: I'm not sure why we can't simply have default value as `null` for all data types, with assertion that the column must be nullable. Unless I'm not mistaken, all Spark SQL data types would work with null if it's nullable. And corresponding type for Avro must be defined as union which includes null type. ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,153 @@ object SchemaConverters extends Logging { schema } } + + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + // For nullable fields in a struct, create a wrapped default value + val fieldDefault = getDefaultValue(field.dataType) + if (field.nullable) { + defaultMap.put(field.name, null) + } else { + defaultMap.put(field.name, fieldDefault) + } + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => null + case ByteType | ShortType | IntegerType => null + case LongType => null + case FloatType => null + case DoubleType => null + case StringType => null + case BinaryType => null + + // Complex types + case ArrayType(_, _) => new java.util.ArrayList[Any]() + case MapType(StringType, _, _) => new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => null + case TimestampType => null + case TimestampNTZType => null + case NullType => null + case _ => null + } + } + + def toAvroTypeWithDefaults( + catalystType: DataType, + nullable: Boolean = false, + recordName: String = "topLevelRecord", + namespace: String = "", + nestingLevel: Int = 0): Schema = { + if (nestingLevel == 0) { + assert(catalystType.isInstanceOf[StructType], + "toAvroTypeWithDefaults should only be called with StructType") + } + val builder = SchemaBuilder.builder() + + def getNestedRecordName(baseName: String): String = { + if (nestingLevel == 0) baseName + else s"${baseName}_nested_$nestingLevel" + } + + def processStructFields( + st: StructType, + fieldsAssembler: FieldAssembler[Schema]): FieldAssembler[Schema] = { Review Comment: It looks like FieldAssembler itself is stateful and we do not reassign (see foreach), which makes me wonder whether we need to return this. fieldsAssembler is out parameter. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -161,24 +179,67 @@ class StateSchemaCompatibilityChecker( * @param ignoreValueSchema - whether to ignore value schema or not */ private def check( - oldSchema: StateStoreColFamilySchema, + oldSchemas: List[StateStoreColFamilySchema], newSchema: StateStoreColFamilySchema, - ignoreValueSchema: Boolean) : Unit = { - val (storedKeySchema, storedValueSchema) = (oldSchema.keySchema, - oldSchema.valueSchema) + ignoreValueSchema: Boolean, + schemaEvolutionEnabled: Boolean): (StateStoreColFamilySchema, Boolean) = { + + def incrementSchemaId(id: Short): Short = (id + 1).toShort + + val mostRecentSchema = oldSchemas.last + // Initialize with old schema IDs + val resultSchema = newSchema.copy( + keySchemaId = mostRecentSchema.keySchemaId, + valueSchemaId = mostRecentSchema.valueSchemaId + ) + val (storedKeySchema, storedValueSchema) = (mostRecentSchema.keySchema, + mostRecentSchema.valueSchema) val (keySchema, valueSchema) = (newSchema.keySchema, newSchema.valueSchema) if (storedKeySchema.equals(keySchema) && (ignoreValueSchema || storedValueSchema.equals(valueSchema))) { // schema is exactly same + (mostRecentSchema, false) } else if (!schemasCompatible(storedKeySchema, keySchema)) { throw StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString, keySchema.toString) + } else if (!ignoreValueSchema && schemaEvolutionEnabled) { + // Check value schema evolution + // Sort schemas by most recent to least recent + val oldAvroSchemas = oldSchemas.sortBy(_.valueSchemaId).reverse.map { oldSchema => + SchemaConverters.toAvroTypeWithDefaults(oldSchema.valueSchema) + }.asJava + val l = oldSchemas.sortBy(_.valueSchemaId).reverse.map { oldSchema => + SchemaConverters.toAvroTypeWithDefaults(oldSchema.valueSchema) + } + val newAvroSchema = SchemaConverters.toAvroTypeWithDefaults(valueSchema) + + val validator = new SchemaValidatorBuilder().canReadStrategy.validateAll() + try { + validator.validate(newAvroSchema, oldAvroSchemas) + } catch { + case s: SchemaValidationException => + throw StateStoreErrors.stateStoreInvalidValueSchemaEvolution( Review Comment: Maybe better to provide the recent schema at least? I wouldn't like to ask users to parse Avro error message. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -161,24 +179,67 @@ class StateSchemaCompatibilityChecker( * @param ignoreValueSchema - whether to ignore value schema or not */ private def check( - oldSchema: StateStoreColFamilySchema, + oldSchemas: List[StateStoreColFamilySchema], newSchema: StateStoreColFamilySchema, - ignoreValueSchema: Boolean) : Unit = { - val (storedKeySchema, storedValueSchema) = (oldSchema.keySchema, - oldSchema.valueSchema) + ignoreValueSchema: Boolean, + schemaEvolutionEnabled: Boolean): (StateStoreColFamilySchema, Boolean) = { + + def incrementSchemaId(id: Short): Short = (id + 1).toShort + + val mostRecentSchema = oldSchemas.last + // Initialize with old schema IDs + val resultSchema = newSchema.copy( + keySchemaId = mostRecentSchema.keySchemaId, + valueSchemaId = mostRecentSchema.valueSchemaId + ) + val (storedKeySchema, storedValueSchema) = (mostRecentSchema.keySchema, + mostRecentSchema.valueSchema) val (keySchema, valueSchema) = (newSchema.keySchema, newSchema.valueSchema) if (storedKeySchema.equals(keySchema) && (ignoreValueSchema || storedValueSchema.equals(valueSchema))) { // schema is exactly same + (mostRecentSchema, false) } else if (!schemasCompatible(storedKeySchema, keySchema)) { throw StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString, keySchema.toString) + } else if (!ignoreValueSchema && schemaEvolutionEnabled) { + // Check value schema evolution + // Sort schemas by most recent to least recent + val oldAvroSchemas = oldSchemas.sortBy(_.valueSchemaId).reverse.map { oldSchema => + SchemaConverters.toAvroTypeWithDefaults(oldSchema.valueSchema) + }.asJava + val l = oldSchemas.sortBy(_.valueSchemaId).reverse.map { oldSchema => Review Comment: This seems to be unused? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -74,47 +79,46 @@ case class StateStoreColFamilySchema( class StateSchemaCompatibilityChecker( providerId: StateStoreProviderId, hadoopConf: Configuration, - oldSchemaFilePath: Option[Path] = None, + oldSchemaFilePaths: List[Path] = List.empty, newSchemaFilePath: Option[Path] = None) extends Logging { - private val schemaFileLocation = if (oldSchemaFilePath.isEmpty) { + // For OperatorStateMetadataV1: Only one schema file present per operator + // per query + // For OperatorStateMetadataV2: Multiple schema files present per operator + // per query. This variable is the latest one + private val schemaFileLocation = if (oldSchemaFilePaths.isEmpty) { val storeCpLocation = providerId.storeId.storeCheckpointLocation() schemaFile(storeCpLocation) } else { - oldSchemaFilePath.get + oldSchemaFilePaths.last } private val fm = CheckpointFileManager.create(schemaFileLocation, hadoopConf) fm.mkdirs(schemaFileLocation.getParent) + private val conf = SparkSession.getActiveSession.get.sessionState.conf Review Comment: Now this checker is not instantiated in executor (or at least serialized from driver), right? Since we are now leveraging SparkSession which is not available in executor. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## Review Comment: I'm not sure whether the change in this file is really relevant to Avro encoding and schema evolution. Skimming through it, it should probably have been there already. Is it refactored, e.g. moved from some other file(s)? If you only moved the code and made no change to the code itself, please leave self PR comments to explain for reviewers e.g. where was the original code. ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,178 @@ object SchemaConverters extends Logging { schema } } + + /** + * Creates default values for Spark SQL data types when converting to Avro. + * This ensures fields have appropriate defaults during schema evolution. + * + * This method recursively processes Spark SQL data types and generates corresponding + * default values that are compatible with Avro schema specifications. It handles + * both primitive types (like Boolean, Integer) and complex types (Arrays, Maps, Structs). + * + * @param dataType The Spark SQL DataType to create a default value for + * @return A default value appropriate for the given data type that's compatible with Avro + */ + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + defaultMap.put(field.name, getDefaultValue(field.dataType)) + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => false + case ByteType | ShortType | IntegerType => 0 + case LongType => 0L + case FloatType => 0.0f + case DoubleType => 0.0 + case StringType => "" + case BinaryType => java.nio.ByteBuffer.allocate(0) + + // Complex types + case ArrayType(elementType, _) => + new java.util.ArrayList[Any]() + case MapType(StringType, valueType, _) => + new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => 0 + case TimestampType => 0L + case TimestampNTZType => 0L + case NullType => null + case _ => null + } + } + + /** + * Converts a Spark SQL schema to a corresponding Avro schema. + * This method provides comprehensive support for schema evolution and handles + * complex nested types while maintaining type safety and compatibility. + * + * The conversion process includes: + * - Converting primitive Spark SQL types to Avro types + * - Handling complex types (arrays, maps, structs) with proper nesting + * - Supporting nullable fields through Avro unions + * - Managing logical types for dates, timestamps, and decimals + * - Generating unique names for nested records + * - Preserving namespace hierarchy for nested structures + * + * @param catalystType The Spark SQL DataType to convert + * @param nullable Whether the field can contain null values + * @param recordName The name to use for the record in the Avro schema + * @param namespace The namespace for the Avro schema + * @param nestingLevel Current nesting level for generating unique names + * @return An Avro Schema corresponding to the input Spark SQL type + * @throws IncompatibleSchemaException if the input type cannot be converted to Avro + */ + def toAvroTypeWithDefaults( + catalystType: DataType, + nullable: Boolean = false, Review Comment: Also if possible, check whether we can mark the recursive method to `@tailrec`. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -192,41 +253,63 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, - stateSchemaVersion: Int): Boolean = { - val existingStateSchemaList = getExistingKeyAndValueSchema() - val newStateSchemaList = newStateSchema - - if (existingStateSchemaList.isEmpty) { - // write the schema file if it doesn't exist - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + stateSchemaVersion: Int, + schemaEvolutionEnabled: Boolean): Boolean = { + val existingStateSchemaMap = readSchemaFiles() + val mostRecentColFamilies = getExistingKeyAndValueSchema().map(_.colFamilyName) + if (mostRecentColFamilies.isEmpty) { + // Initialize schemas with ID 0 when no existing schema + val initializedSchemas = newStateSchema.map(schema => + schema.copy(keySchemaId = 0, valueSchemaId = 0) + ) + createSchemaFile(initializedSchemas.sortBy(_.colFamilyName), stateSchemaVersion) true } else { - // validate if the new schema is compatible with the existing schema - val existingSchemaMap = existingStateSchemaList.map { schema => - schema.colFamilyName -> schema - }.toMap - // For each new state variable, we want to compare it to the old state variable - // schema with the same name - newStateSchemaList.foreach { newSchema => - existingSchemaMap.get(newSchema.colFamilyName).foreach { existingStateSchema => - check(existingStateSchema, newSchema, ignoreValueSchema) - } + + // Process each new schema and track if any have evolved + val (evolvedSchemas, hasEvolutions) = newStateSchema.foldLeft( + (List.empty[StateStoreColFamilySchema], false)) { + case ((schemas, evolved), newSchema) => + existingStateSchemaMap.get(newSchema.colFamilyName) match { + case Some(existingSchemas) => + val (updatedSchema, hasEvolved) = check( + existingSchemas, newSchema, ignoreValueSchema, schemaEvolutionEnabled) + (updatedSchema :: schemas, evolved || hasEvolved) + case None => + // New column family - initialize with schema ID 0 + val newSchemaWithIds = newSchema.copy(keySchemaId = 0, valueSchemaId = 0) + (newSchemaWithIds :: schemas, true) + } } - val colFamiliesAddedOrRemoved = - (newStateSchemaList.map(_.colFamilyName).toSet != existingSchemaMap.keySet) - if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) { - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + + val newColFamilies = newStateSchema.map(_.colFamilyName).toSet + val oldColFamilies = mostRecentColFamilies.toSet + val colFamiliesAddedOrRemoved = newColFamilies != oldColFamilies + val newSchemaFileWritten = hasEvolutions || colFamiliesAddedOrRemoved + + if (oldSchemaFilePaths.size == conf.streamingStateSchemaFilesThreshold && + colFamiliesAddedOrRemoved) { + throw StateStoreErrors.streamingStateSchemaFilesThresholdExceeded( + oldSchemaFilePaths.size + 1, + conf.streamingStateSchemaFilesThreshold, + // need to compute symmetric diff between col family list + (newColFamilies.diff(oldColFamilies) ++ Review Comment: Why not be more informative? I think it's lot better to provide the information of prior column families vs new column families, or differentiate the representation between cfs being added vs cfs being removed, e.g. `+cf1, +cf2, -cf3, -cf4` / or `added: [cf1, cf2], removed: [cf3, cf4]`. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -192,41 +253,63 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, - stateSchemaVersion: Int): Boolean = { - val existingStateSchemaList = getExistingKeyAndValueSchema() - val newStateSchemaList = newStateSchema - - if (existingStateSchemaList.isEmpty) { - // write the schema file if it doesn't exist - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + stateSchemaVersion: Int, + schemaEvolutionEnabled: Boolean): Boolean = { + val existingStateSchemaMap = readSchemaFiles() + val mostRecentColFamilies = getExistingKeyAndValueSchema().map(_.colFamilyName) + if (mostRecentColFamilies.isEmpty) { + // Initialize schemas with ID 0 when no existing schema + val initializedSchemas = newStateSchema.map(schema => Review Comment: nit: `{` rather than `(`. only use () for a single line ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,153 @@ object SchemaConverters extends Logging { schema } } + + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + // For nullable fields in a struct, create a wrapped default value + val fieldDefault = getDefaultValue(field.dataType) + if (field.nullable) { + defaultMap.put(field.name, null) + } else { + defaultMap.put(field.name, fieldDefault) + } + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => null + case ByteType | ShortType | IntegerType => null + case LongType => null + case FloatType => null + case DoubleType => null + case StringType => null + case BinaryType => null + + // Complex types + case ArrayType(_, _) => new java.util.ArrayList[Any]() + case MapType(StringType, _, _) => new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => null + case TimestampType => null + case TimestampNTZType => null + case NullType => null + case _ => null + } + } + + def toAvroTypeWithDefaults( + catalystType: DataType, + nullable: Boolean = false, + recordName: String = "topLevelRecord", + namespace: String = "", + nestingLevel: Int = 0): Schema = { + if (nestingLevel == 0) { + assert(catalystType.isInstanceOf[StructType], + "toAvroTypeWithDefaults should only be called with StructType") + } + val builder = SchemaBuilder.builder() + + def getNestedRecordName(baseName: String): String = { + if (nestingLevel == 0) baseName + else s"${baseName}_nested_$nestingLevel" + } + + def processStructFields( + st: StructType, + fieldsAssembler: FieldAssembler[Schema]): FieldAssembler[Schema] = { + st.foreach { field => + val isLeafType = field.dataType match { + case _: StructType | _: ArrayType | _: MapType => false + case _ => true + } + + val innerType = toAvroTypeWithDefaults( + field.dataType, + nullable = isLeafType, // Only make leaf types nullable Review Comment: Is this due to Avro? I don't think this is needed for Spark SQL. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -192,41 +253,63 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, - stateSchemaVersion: Int): Boolean = { - val existingStateSchemaList = getExistingKeyAndValueSchema() - val newStateSchemaList = newStateSchema - - if (existingStateSchemaList.isEmpty) { - // write the schema file if it doesn't exist - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + stateSchemaVersion: Int, + schemaEvolutionEnabled: Boolean): Boolean = { + val existingStateSchemaMap = readSchemaFiles() + val mostRecentColFamilies = getExistingKeyAndValueSchema().map(_.colFamilyName) + if (mostRecentColFamilies.isEmpty) { + // Initialize schemas with ID 0 when no existing schema + val initializedSchemas = newStateSchema.map(schema => + schema.copy(keySchemaId = 0, valueSchemaId = 0) + ) + createSchemaFile(initializedSchemas.sortBy(_.colFamilyName), stateSchemaVersion) true } else { - // validate if the new schema is compatible with the existing schema - val existingSchemaMap = existingStateSchemaList.map { schema => - schema.colFamilyName -> schema - }.toMap - // For each new state variable, we want to compare it to the old state variable - // schema with the same name - newStateSchemaList.foreach { newSchema => - existingSchemaMap.get(newSchema.colFamilyName).foreach { existingStateSchema => - check(existingStateSchema, newSchema, ignoreValueSchema) - } + Review Comment: nit: unnecessary new line? ########## sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala: ########## @@ -372,6 +373,178 @@ object SchemaConverters extends Logging { schema } } + + /** + * Creates default values for Spark SQL data types when converting to Avro. + * This ensures fields have appropriate defaults during schema evolution. + * + * This method recursively processes Spark SQL data types and generates corresponding + * default values that are compatible with Avro schema specifications. It handles + * both primitive types (like Boolean, Integer) and complex types (Arrays, Maps, Structs). + * + * @param dataType The Spark SQL DataType to create a default value for + * @return A default value appropriate for the given data type that's compatible with Avro + */ + private def getDefaultValue(dataType: DataType): Any = { + def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = { + val defaultMap = new java.util.HashMap[String, Any]() + st.fields.foreach { field => + defaultMap.put(field.name, getDefaultValue(field.dataType)) + } + defaultMap + } + + dataType match { + // Basic types + case BooleanType => false + case ByteType | ShortType | IntegerType => 0 + case LongType => 0L + case FloatType => 0.0f + case DoubleType => 0.0 + case StringType => "" + case BinaryType => java.nio.ByteBuffer.allocate(0) + + // Complex types + case ArrayType(elementType, _) => + new java.util.ArrayList[Any]() + case MapType(StringType, valueType, _) => + new java.util.HashMap[String, Any]() + case st: StructType => createNestedDefault(st) + + // Special types + case _: DecimalType => java.nio.ByteBuffer.allocate(0) + case DateType => 0 + case TimestampType => 0L + case TimestampNTZType => 0L + case NullType => null + case _ => null + } + } + + /** + * Converts a Spark SQL schema to a corresponding Avro schema. + * This method provides comprehensive support for schema evolution and handles + * complex nested types while maintaining type safety and compatibility. + * + * The conversion process includes: + * - Converting primitive Spark SQL types to Avro types + * - Handling complex types (arrays, maps, structs) with proper nesting + * - Supporting nullable fields through Avro unions + * - Managing logical types for dates, timestamps, and decimals + * - Generating unique names for nested records + * - Preserving namespace hierarchy for nested structures + * + * @param catalystType The Spark SQL DataType to convert + * @param nullable Whether the field can contain null values + * @param recordName The name to use for the record in the Avro schema + * @param namespace The namespace for the Avro schema + * @param nestingLevel Current nesting level for generating unique names + * @return An Avro Schema corresponding to the input Spark SQL type + * @throws IncompatibleSchemaException if the input type cannot be converted to Avro + */ + def toAvroTypeWithDefaults( + catalystType: DataType, + nullable: Boolean = false, Review Comment: Sorry I did not suggest to add assertion. Revisit my comment above. If we revisit the caller of this method, except recursive call, this method is only called with StructType as catalystType and no further param. This must be considered as API, and the method signature to handle recursive call can be either defined as nested method or private method (shouldn't be an API outer callers will call). Once you do this, I don't think you are going to have "optional" parameters. Let's not make confusion of using this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org