This is an automated email from the ASF dual-hosted git repository. jarvis pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 979a6703a0 [BUG] Fixed avro format support for storing null (#8424) 979a6703a0 is described below commit 979a6703a00d96c4cd3cddb53b57008f6fb6f128 Author: Tu-maimes <tu_mai...@163.com> AuthorDate: Fri Jan 3 09:04:01 2025 +0800 [BUG] Fixed avro format support for storing null (#8424) Co-authored-by: Tu-maimes <wangjie_...@163.com> --- .../seatunnel/format/avro/AvroToRowConverter.java | 4 ++ .../SeaTunnelRowTypeToAvroSchemaConverter.java | 43 ++++++++++++----- .../format/avro/AvroSerializationSchemaTest.java | 55 ++++++++++++++++++++++ 3 files changed, 89 insertions(+), 13 deletions(-) diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java index e80b78ee83..84b1063600 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java @@ -38,6 +38,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; public class AvroToRowConverter implements Serializable { @@ -82,6 +83,9 @@ public class AvroToRowConverter implements Serializable { } private Object convertField(SeaTunnelDataType<?> dataType, Object val) { + if (Objects.isNull(val)) { + return null; + } switch (dataType.getSqlType()) { case STRING: return val.toString(); diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java index 0a990f3fc6..56a65d3e53 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java @@ -58,28 +58,39 @@ public class SeaTunnelRowTypeToAvroSchemaConverter { switch (seaTunnelDataType.getSqlType()) { case STRING: - return Schema.create(Schema.Type.STRING); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)); case BYTES: - return Schema.create(Schema.Type.BYTES); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES)); case TINYINT: case SMALLINT: case INT: - return Schema.create(Schema.Type.INT); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT)); case BIGINT: - return Schema.create(Schema.Type.LONG); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)); case FLOAT: - return Schema.create(Schema.Type.FLOAT); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.FLOAT)); case DOUBLE: - return Schema.create(Schema.Type.DOUBLE); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE)); case BOOLEAN: - return Schema.create(Schema.Type.BOOLEAN); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN)); case MAP: SeaTunnelDataType<?> valueType = ((MapType<?, ?>) seaTunnelDataType).getValueType(); - return Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType))); case ARRAY: SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) seaTunnelDataType).getElementType(); - return Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType))); case ROW: SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes(); @@ -93,12 +104,18 @@ public class SeaTunnelRowTypeToAvroSchemaConverter { int precision = ((DecimalType) seaTunnelDataType).getPrecision(); int scale = ((DecimalType) seaTunnelDataType).getScale(); LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, scale); - return decimal.addToSchema(Schema.create(Schema.Type.BYTES)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + decimal.addToSchema(Schema.create(Schema.Type.BYTES))); case TIMESTAMP: - return LogicalTypes.localTimestampMillis() - .addToSchema(Schema.create(Schema.Type.LONG)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.localTimestampMillis() + .addToSchema(Schema.create(Schema.Type.LONG))); case DATE: - return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + return Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))); case NULL: return Schema.create(Schema.Type.NULL); default: diff --git a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java index 42b8029f16..52ba7d76e6 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java @@ -177,4 +177,59 @@ class AvroSerializationSchemaTest { LocalDateTime localDateTime1 = (LocalDateTime) subRow.getField(13); Assertions.assertEquals(localDateTime1.compareTo(localDateTime), 0); } + + private SeaTunnelRow buildSeaTunnelRowValueNull() { + SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14); + subSeaTunnelRow.setField(0, null); + subSeaTunnelRow.setField(1, null); + subSeaTunnelRow.setField(2, null); + subSeaTunnelRow.setField(3, null); + subSeaTunnelRow.setField(4, null); + subSeaTunnelRow.setField(5, null); + subSeaTunnelRow.setField(6, null); + subSeaTunnelRow.setField(7, null); + subSeaTunnelRow.setField(8, null); + subSeaTunnelRow.setField(9, null); + subSeaTunnelRow.setField(10, null); + subSeaTunnelRow.setField(11, null); + subSeaTunnelRow.setField(12, null); + subSeaTunnelRow.setField(13, null); + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15); + seaTunnelRow.setField(0, null); + seaTunnelRow.setField(1, null); + seaTunnelRow.setField(2, null); + seaTunnelRow.setField(3, null); + seaTunnelRow.setField(4, null); + seaTunnelRow.setField(5, null); + seaTunnelRow.setField(6, null); + seaTunnelRow.setField(7, null); + seaTunnelRow.setField(8, null); + seaTunnelRow.setField(9, null); + seaTunnelRow.setField(10, null); + seaTunnelRow.setField(11, null); + seaTunnelRow.setField(12, null); + seaTunnelRow.setField(13, null); + seaTunnelRow.setField(14, subSeaTunnelRow); + return seaTunnelRow; + } + + @Test + public void testSerializationValueNull() throws IOException { + SeaTunnelRowType rowType = buildSeaTunnelRowType(); + CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "", "", "test", rowType); + SeaTunnelRow seaTunnelRow = buildSeaTunnelRowValueNull(); + AvroSerializationSchema serializationSchema = new AvroSerializationSchema(rowType); + byte[] bytes = serializationSchema.serialize(seaTunnelRow); + AvroDeserializationSchema deserializationSchema = + new AvroDeserializationSchema(catalogTable); + SeaTunnelRow deserialize = deserializationSchema.deserialize(bytes); + String[] strArray1 = (String[]) seaTunnelRow.getField(1); + String[] strArray2 = (String[]) deserialize.getField(1); + Assertions.assertArrayEquals(strArray1, strArray2); + SeaTunnelRow subRow = (SeaTunnelRow) deserialize.getField(14); + Assertions.assertEquals(subRow.getField(9), null); + Assertions.assertEquals(subRow.getField(12), null); + Assertions.assertEquals(subRow.getField(13), null); + } }