This is an automated email from the ASF dual-hosted git repository.

jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 979a6703a0 [BUG] Fixed avro format support for storing null (#8424)
979a6703a0 is described below

commit 979a6703a00d96c4cd3cddb53b57008f6fb6f128
Author: Tu-maimes <tu_mai...@163.com>
AuthorDate: Fri Jan 3 09:04:01 2025 +0800

    [BUG] Fixed avro format support for storing null (#8424)
    
    Co-authored-by: Tu-maimes <wangjie_...@163.com>
---
 .../seatunnel/format/avro/AvroToRowConverter.java  |  4 ++
 .../SeaTunnelRowTypeToAvroSchemaConverter.java     | 43 ++++++++++++-----
 .../format/avro/AvroSerializationSchemaTest.java   | 55 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
index e80b78ee83..84b1063600 100644
--- 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -38,6 +38,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class AvroToRowConverter implements Serializable {
 
@@ -82,6 +83,9 @@ public class AvroToRowConverter implements Serializable {
     }
 
     private Object convertField(SeaTunnelDataType<?> dataType, Object val) {
+        if (Objects.isNull(val)) {
+            return null;
+        }
         switch (dataType.getSqlType()) {
             case STRING:
                 return val.toString();
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
index 0a990f3fc6..56a65d3e53 100644
--- 
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
@@ -58,28 +58,39 @@ public class SeaTunnelRowTypeToAvroSchemaConverter {
 
         switch (seaTunnelDataType.getSqlType()) {
             case STRING:
-                return Schema.create(Schema.Type.STRING);
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING));
             case BYTES:
-                return Schema.create(Schema.Type.BYTES);
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.BYTES));
             case TINYINT:
             case SMALLINT:
             case INT:
-                return Schema.create(Schema.Type.INT);
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.INT));
             case BIGINT:
-                return Schema.create(Schema.Type.LONG);
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.LONG));
             case FLOAT:
-                return Schema.create(Schema.Type.FLOAT);
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.FLOAT));
             case DOUBLE:
-                return Schema.create(Schema.Type.DOUBLE);
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.DOUBLE));
             case BOOLEAN:
-                return Schema.create(Schema.Type.BOOLEAN);
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.BOOLEAN));
             case MAP:
                 SeaTunnelDataType<?> valueType = ((MapType<?, ?>) 
seaTunnelDataType).getValueType();
-                return 
Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType));
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL),
+                        
Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType)));
             case ARRAY:
                 SeaTunnelDataType<?> elementType =
                         ((ArrayType<?, ?>) seaTunnelDataType).getElementType();
-                return 
Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType));
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL),
+                        
Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType)));
             case ROW:
                 SeaTunnelDataType<?>[] fieldTypes =
                         ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
@@ -93,12 +104,18 @@ public class SeaTunnelRowTypeToAvroSchemaConverter {
                 int precision = ((DecimalType) 
seaTunnelDataType).getPrecision();
                 int scale = ((DecimalType) seaTunnelDataType).getScale();
                 LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, 
scale);
-                return decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL),
+                        decimal.addToSchema(Schema.create(Schema.Type.BYTES)));
             case TIMESTAMP:
-                return LogicalTypes.localTimestampMillis()
-                        .addToSchema(Schema.create(Schema.Type.LONG));
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL),
+                        LogicalTypes.localTimestampMillis()
+                                .addToSchema(Schema.create(Schema.Type.LONG)));
             case DATE:
-                return 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+                return Schema.createUnion(
+                        Schema.create(Schema.Type.NULL),
+                        
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)));
             case NULL:
                 return Schema.create(Schema.Type.NULL);
             default:
diff --git 
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
index 42b8029f16..52ba7d76e6 100644
--- 
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
@@ -177,4 +177,59 @@ class AvroSerializationSchemaTest {
         LocalDateTime localDateTime1 = (LocalDateTime) subRow.getField(13);
         Assertions.assertEquals(localDateTime1.compareTo(localDateTime), 0);
     }
+
+    private SeaTunnelRow buildSeaTunnelRowValueNull() {
+        SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14);
+        subSeaTunnelRow.setField(0, null);
+        subSeaTunnelRow.setField(1, null);
+        subSeaTunnelRow.setField(2, null);
+        subSeaTunnelRow.setField(3, null);
+        subSeaTunnelRow.setField(4, null);
+        subSeaTunnelRow.setField(5, null);
+        subSeaTunnelRow.setField(6, null);
+        subSeaTunnelRow.setField(7, null);
+        subSeaTunnelRow.setField(8, null);
+        subSeaTunnelRow.setField(9, null);
+        subSeaTunnelRow.setField(10, null);
+        subSeaTunnelRow.setField(11, null);
+        subSeaTunnelRow.setField(12, null);
+        subSeaTunnelRow.setField(13, null);
+
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15);
+        seaTunnelRow.setField(0, null);
+        seaTunnelRow.setField(1, null);
+        seaTunnelRow.setField(2, null);
+        seaTunnelRow.setField(3, null);
+        seaTunnelRow.setField(4, null);
+        seaTunnelRow.setField(5, null);
+        seaTunnelRow.setField(6, null);
+        seaTunnelRow.setField(7, null);
+        seaTunnelRow.setField(8, null);
+        seaTunnelRow.setField(9, null);
+        seaTunnelRow.setField(10, null);
+        seaTunnelRow.setField(11, null);
+        seaTunnelRow.setField(12, null);
+        seaTunnelRow.setField(13, null);
+        seaTunnelRow.setField(14, subSeaTunnelRow);
+        return seaTunnelRow;
+    }
+
+    @Test
+    public void testSerializationValueNull() throws IOException {
+        SeaTunnelRowType rowType = buildSeaTunnelRowType();
+        CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "", 
"", "test", rowType);
+        SeaTunnelRow seaTunnelRow = buildSeaTunnelRowValueNull();
+        AvroSerializationSchema serializationSchema = new 
AvroSerializationSchema(rowType);
+        byte[] bytes = serializationSchema.serialize(seaTunnelRow);
+        AvroDeserializationSchema deserializationSchema =
+                new AvroDeserializationSchema(catalogTable);
+        SeaTunnelRow deserialize = deserializationSchema.deserialize(bytes);
+        String[] strArray1 = (String[]) seaTunnelRow.getField(1);
+        String[] strArray2 = (String[]) deserialize.getField(1);
+        Assertions.assertArrayEquals(strArray1, strArray2);
+        SeaTunnelRow subRow = (SeaTunnelRow) deserialize.getField(14);
+        Assertions.assertEquals(subRow.getField(9), null);
+        Assertions.assertEquals(subRow.getField(12), null);
+        Assertions.assertEquals(subRow.getField(13), null);
+    }
 }

Reply via email to