anishshri-db commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1902056259


##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -372,6 +373,158 @@ object SchemaConverters extends Logging {
       schema
     }
   }
+
+  /**
+   * Creates default values for Spark SQL data types when converting to Avro.
+   * This ensures fields have appropriate defaults during schema evolution.
+   */
+  private def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      // Basic types
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f
+      case DoubleType => 0.0
+      case StringType => ""
+      case BinaryType => java.nio.ByteBuffer.allocate(0)
+
+      // Complex types
+      case ArrayType(elementType, _) =>
+        val defaultArray = new java.util.ArrayList[Any]()

Review Comment:
   Why not have empty collections ? i.e. empty array/map etc ?



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -372,6 +373,158 @@ object SchemaConverters extends Logging {
       schema
     }
   }
+
+  /**
+   * Creates default values for Spark SQL data types when converting to Avro.
+   * This ensures fields have appropriate defaults during schema evolution.
+   */
+  private def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      // Basic types
+      case BooleanType => false

Review Comment:
   Are these Avro defaults too ?



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -299,73 +300,145 @@ object SchemaConverters extends Logging {
     }
   }
 
-  /**
-   * Converts a Spark SQL schema to a corresponding Avro schema.
-   *
-   * @since 2.4.0
-   */
+  def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f
+      case DoubleType => 0.0
+      case StringType => ""
+      case BinaryType => java.nio.ByteBuffer.allocate(0)
+      case ArrayType(elementType, _) =>
+        val defaultArray = new java.util.ArrayList[Any]()
+        defaultArray.add(getDefaultValue(elementType))  // Add one default 
element
+        defaultArray
+      case MapType(StringType, valueType, _) =>
+        val defaultMap = new java.util.HashMap[String, Any]()
+        defaultMap.put("defaultKey", getDefaultValue(valueType))  // Add one 
default entry
+        defaultMap
+      case st: StructType => createNestedDefault(st)  // Handle nested structs 
recursively
+      case _: DecimalType => java.nio.ByteBuffer.allocate(0)
+      case DateType => 0
+      case TimestampType => 0L
+      case TimestampNTZType => 0L
+      case NullType => null
+      case _ => null
+    }
+  }
+
   def toAvroType(
       catalystType: DataType,
       nullable: Boolean = false,
       recordName: String = "topLevelRecord",
-      nameSpace: String = "")
-    : Schema = {
+      nameSpace: String = "",
+      withDefaults: Boolean = true,
+      nestingLevel: Int = 0): Schema = {
+
     val builder = SchemaBuilder.builder()
 
+    // Helper function to generate unique names for nested records
+    def getNestedRecordName(baseName: String): String = {
+      if (nestingLevel == 0) baseName

Review Comment:
   dont think we handled this ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -242,20 +242,42 @@ class IncrementalExecution(
                     log"versions: ${MDC(ERROR, e.getMessage)}")
                 None
               }
-              oldMetadata match {
-                case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, 
metadata)
-                case None =>
-              }
+            } else {
+              None
+            }
+            val stateSchemaList = ssw.stateSchemaList(schemaValidationResult,
+              oldMetadata)
+            val metadata = ssw.operatorStateMetadata(stateSchemaList)
+            oldMetadata match {
+              case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, 
metadata)
+              case None =>
             }
             val metadataWriter = OperatorStateMetadataWriter.createWriter(
                 new Path(checkpointLocation, 
ssw.getStateInfo.operatorId.toString),
                 hadoopConf,
                 ssw.operatorStateMetadataVersion,
                 Some(currentBatchId))
             metadataWriter.write(metadata)
-          case _ =>
+            if (ssw.supportsSchemaEvolution) {
+              val stateSchemaMetadata = StateSchemaMetadata.
+                createStateSchemaMetadata(checkpointLocation, hadoopConf, 
stateSchemaList.head)
+
+              val stateSchemaBroadcast =

Review Comment:
   lets note that the broadcast happens here for the first run



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -64,7 +64,9 @@ class IncrementalExecution(
     val watermarkPropagator: WatermarkPropagator,
     val isFirstBatch: Boolean,
     val currentStateStoreCkptId:
-      MutableMap[Long, Array[Array[String]]] = MutableMap[Long, 
Array[Array[String]]]())
+      MutableMap[Long, Array[Array[String]]] = MutableMap[Long, 
Array[Array[String]]](),
+    val stateSchemaMetadatas: MutableMap[Long, StateSchemaBroadcast] =

Review Comment:
   Lets add some comments for this ?



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -372,6 +373,158 @@ object SchemaConverters extends Logging {
       schema
     }
   }
+
+  /**
+   * Creates default values for Spark SQL data types when converting to Avro.
+   * This ensures fields have appropriate defaults during schema evolution.
+   */
+  private def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      // Basic types
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f
+      case DoubleType => 0.0
+      case StringType => ""
+      case BinaryType => java.nio.ByteBuffer.allocate(0)
+
+      // Complex types
+      case ArrayType(elementType, _) =>
+        val defaultArray = new java.util.ArrayList[Any]()
+        defaultArray.add(getDefaultValue(elementType))
+        defaultArray
+      case MapType(StringType, valueType, _) =>
+        val defaultMap = new java.util.HashMap[String, Any]()
+        defaultMap.put("defaultKey", getDefaultValue(valueType))
+        defaultMap
+      case st: StructType => createNestedDefault(st)
+
+      // Special types
+      case _: DecimalType => java.nio.ByteBuffer.allocate(0)
+      case DateType => 0
+      case TimestampType => 0L
+      case TimestampNTZType => 0L
+      case NullType => null
+      case _ => null
+    }
+  }
+
+  /**
+   * Converts a Spark SQL schema to a corresponding Avro schema.
+   * Handles nested types and adds support for schema evolution.
+   */
+  def toAvroTypeWithDefaults(
+      catalystType: DataType,
+      nullable: Boolean = false,
+      recordName: String = "topLevelRecord",
+      nameSpace: String = "",

Review Comment:
   just say `namespace` ? also - what does this refer to ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to