anishshri-db commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1899788343


##########
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala:
##########
@@ -299,73 +300,145 @@ object SchemaConverters extends Logging {
     }
   }
 
-  /**
-   * Converts a Spark SQL schema to a corresponding Avro schema.
-   *
-   * @since 2.4.0
-   */
+  def getDefaultValue(dataType: DataType): Any = {
+    def createNestedDefault(st: StructType): java.util.HashMap[String, Any] = {
+      val defaultMap = new java.util.HashMap[String, Any]()
+      st.fields.foreach { field =>
+        defaultMap.put(field.name, getDefaultValue(field.dataType))
+      }
+      defaultMap
+    }
+
+    dataType match {
+      case BooleanType => false
+      case ByteType | ShortType | IntegerType => 0
+      case LongType => 0L
+      case FloatType => 0.0f
+      case DoubleType => 0.0
+      case StringType => ""
+      case BinaryType => java.nio.ByteBuffer.allocate(0)
+      case ArrayType(elementType, _) =>
+        val defaultArray = new java.util.ArrayList[Any]()
+        defaultArray.add(getDefaultValue(elementType))  // Add one default 
element
+        defaultArray
+      case MapType(StringType, valueType, _) =>
+        val defaultMap = new java.util.HashMap[String, Any]()
+        defaultMap.put("defaultKey", getDefaultValue(valueType))  // Add one 
default entry
+        defaultMap
+      case st: StructType => createNestedDefault(st)  // Handle nested structs 
recursively
+      case _: DecimalType => java.nio.ByteBuffer.allocate(0)
+      case DateType => 0
+      case TimestampType => 0L
+      case TimestampNTZType => 0L
+      case NullType => null
+      case _ => null
+    }
+  }
+
   def toAvroType(
       catalystType: DataType,
       nullable: Boolean = false,
       recordName: String = "topLevelRecord",
-      nameSpace: String = "")
-    : Schema = {
+      nameSpace: String = "",
+      withDefaults: Boolean = true,
+      nestingLevel: Int = 0): Schema = {
+
     val builder = SchemaBuilder.builder()
 
+    // Helper function to generate unique names for nested records
+    def getNestedRecordName(baseName: String): String = {
+      if (nestingLevel == 0) baseName
+      else s"${baseName}_nested_$nestingLevel"
+    }
+
+    // Helper function to handle struct fields
+    def processStructFields(
+        st: StructType,
+        fieldsAssembler: FieldAssembler[Schema]): FieldAssembler[Schema] = {
+      st.foreach { field =>
+        val fieldAvroType = toAvroType(
+          field.dataType,
+          field.nullable,
+          getNestedRecordName(field.name),
+          nameSpace,
+          withDefaults,
+          nestingLevel + 1
+        )
+
+        val fieldAssembler = 
fieldsAssembler.name(field.name).`type`(fieldAvroType)
+
+        if (withDefaults) {
+          fieldAssembler.withDefault(getDefaultValue(field.dataType))
+        } else {
+          fieldAssembler.noDefault()
+        }
+      }
+      fieldsAssembler
+    }
+
     val schema = catalystType match {
+      // Basic types remain the same
       case BooleanType => builder.booleanType()
       case ByteType | ShortType | IntegerType => builder.intType()
       case LongType => builder.longType()
+      case FloatType => builder.floatType()

Review Comment:
   should we move to new functions ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to