MaxGekk commented on code in PR #49590:
URL: https://github.com/apache/spark/pull/49590#discussion_r1925572716


##########
connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala:
##########
@@ -241,6 +241,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
     }
   }
 
+  test("schema nullability mismatch") {
+    val innerSchema = StructType(Seq(StructField("inner", StringType, false)))
+    val nestedSchema = StructType(Seq(StructField("outer", innerSchema, 
false)))
+
+    val nestedRow = Row.fromSeq(Seq(null))
+    val converter = 
CatalystTypeConverters.createToCatalystConverter(nestedSchema)
+    val input = Literal.create(converter(nestedRow), nestedSchema)
+    checkError(
+      exception = intercept[SparkRuntimeException] {
+        CatalystDataToAvro(input, None).eval()

Review Comment:
   Can you reproduce the error using public API?



##########
sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -273,20 +275,29 @@ private[sql] class AvroSerializer(
     avroSchemaHelper.validateNoExtraCatalystFields(ignoreNullable = false)
     avroSchemaHelper.validateNoExtraRequiredAvroFields()
 
-    val (avroIndices, fieldConverters) = avroSchemaHelper.matchedFields.map {
+    val (avroIndices, fieldConverters, catalystFields) = 
avroSchemaHelper.matchedFields.map {
       case AvroMatchedField(catalystField, _, avroField) =>
         val converter = newConverter(catalystField.dataType,
           resolveNullableType(avroField.schema(), catalystField.nullable),
           catalystPath :+ catalystField.name, avroPath :+ avroField.name)
-        (avroField.pos(), converter)
-    }.toArray.unzip
+        (avroField.pos(), converter, catalystField)
+    }.toArray.unzip3
 
     val numFields = catalystStruct.length
     row: InternalRow =>
       val result = new Record(avroStruct)
       var i = 0
       while (i < numFields) {
         if (row.isNullAt(i)) {
+          if (!catalystFields(i).nullable) {
+            throw new SparkRuntimeException(
+              errorClass = "AVRO_CANNOT_WRITE_NULL_FIELD",
+              messageParameters = Map(
+                "name" -> catalystFields(i).name,

Review Comment:
   The name should be quoted by `toSQLId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to