This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4b3af9bef4 [Improve][Doris Connector] Unified serialization method,Use 
RowToJsonConverter and TextSerializationSchema (#7229)
4b3af9bef4 is described below

commit 4b3af9bef4e1753838a7750e86bde71bae8562ae
Author: Guangdong Liu <804167...@qq.com>
AuthorDate: Mon Jul 22 13:04:19 2024 +0800

    [Improve][Doris Connector] Unified serialization method,Use 
RowToJsonConverter and TextSerializationSchema (#7229)
    
    * 1
    
    * 1
    
    * 1
    
    * Update 
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
    
    Co-authored-by: Jia Fan <fanjiaemi...@qq.com>
    
    ---------
    
    Co-authored-by: gdliu3 <gdl...@iflytek.com>
    Co-authored-by: Jia Fan <fanjiaemi...@qq.com>
---
 .../doris/datatype/DorisTypeConverterFactory.java  |   4 +-
 .../doris/serialize/SeaTunnelRowConverter.java     | 107 -----------------
 .../doris/serialize/SeaTunnelRowSerializer.java    | 130 +++++++++------------
 .../doris/source/serialization/RowBatch.java       |  74 ++++++++++--
 .../doris/serialize/SeaTunnelRowConverterTest.java |  54 ---------
 .../format/json/JsonSerializationSchema.java       |   7 ++
 .../seatunnel/format/json/RowToJsonConverters.java |  37 +++---
 .../format/json/JsonRowDataSerDeSchemaTest.java    |  77 ++++++++++++
 .../format/text/TextSerializationSchema.java       |  16 ++-
 .../format/text/CsvTextFormatSchemaTest.java       |  40 +++++++
 .../format/text/TextFormatSchemaTest.java          |  41 +++++++
 11 files changed, 322 insertions(+), 265 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
index 4206e4fdc6..04b33f3364 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.connectors.doris.datatype;
 
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.converter.TypeConverter;
-import org.apache.seatunnel.common.exception.CommonError;
-import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -37,7 +35,7 @@ public class DorisTypeConverterFactory {
                 || 
dorisVersion.toLowerCase(Locale.ROOT).startsWith("selectdb-doris-2.")) {
             return DorisTypeConverterV2.INSTANCE;
         } else {
-            throw CommonError.unsupportedVersion(DorisConfig.IDENTIFIER, 
dorisVersion);
+            return DorisTypeConverterV2.INSTANCE;
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
deleted file mode 100644
index 0fd8e27306..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.serialize;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.DecimalArrayType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.common.utils.DateTimeUtils;
-import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-
-import lombok.Builder;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class SeaTunnelRowConverter {
-    @Builder.Default private DateUtils.Formatter dateFormatter = 
DateUtils.Formatter.YYYY_MM_DD;
-
-    @Builder.Default
-    private DateTimeUtils.Formatter dateTimeFormatter =
-            DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS;
-
-    @Builder.Default private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
-
-    protected Object convert(SeaTunnelDataType dataType, Object val) {
-        if (val == null) {
-            return null;
-        }
-        switch (dataType.getSqlType()) {
-            case TINYINT:
-            case SMALLINT:
-            case INT:
-            case BIGINT:
-            case FLOAT:
-            case DOUBLE:
-            case DECIMAL:
-            case BOOLEAN:
-            case STRING:
-                return val;
-            case DATE:
-                return DateUtils.toString((LocalDate) val, dateFormatter);
-            case TIME:
-                return TimeUtils.toString((LocalTime) val, timeFormatter);
-            case TIMESTAMP:
-                return DateTimeUtils.toString((LocalDateTime) val, 
dateTimeFormatter);
-            case ARRAY:
-                return convertArray(dataType, val);
-            case MAP:
-                return convertMap(dataType, val);
-            case BYTES:
-                return new String((byte[]) val);
-            default:
-                throw new DorisConnectorException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                        dataType + " is not supported ");
-        }
-    }
-
-    public Object[] convertArray(SeaTunnelDataType dataType, Object val) {
-        if (dataType instanceof DecimalArrayType) {
-            return (BigDecimal[]) val;
-        }
-
-        SeaTunnelDataType elementType = ((ArrayType) 
dataType).getElementType();
-        Object[] realValue = (Object[]) val;
-        Object[] newArrayValue = new Object[realValue.length];
-        for (int i = 0; i < realValue.length; i++) {
-            newArrayValue[i] = convert(elementType, realValue[i]);
-        }
-        return newArrayValue;
-    }
-
-    public Map<Object, Object> convertMap(SeaTunnelDataType dataType, Object 
val) {
-        MapType valueMapType = (MapType) dataType;
-        Map<Object, Object> realValue = (Map<Object, Object>) val;
-        Map<Object, Object> newMapValue = new LinkedHashMap<>();
-        for (Map.Entry entry : realValue.entrySet()) {
-            newMapValue.put(
-                    convert(valueMapType.getKeyType(), entry.getKey()),
-                    convert(valueMapType.getValueType(), entry.getValue()));
-        }
-        return newMapValue;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
index 4bfc148d86..0c5b9c0c42 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
@@ -17,27 +17,28 @@
 
 package org.apache.seatunnel.connectors.doris.serialize;
 
+import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.StringJoiner;
+import java.util.Arrays;
+import java.util.List;
 
-import static com.google.common.base.Preconditions.checkState;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
 import static 
org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.CSV;
 import static 
org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.JSON;
 import static 
org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.NULL_VALUE;
 
-public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements 
DorisSerializer {
+public class SeaTunnelRowSerializer implements DorisSerializer {
     String type;
-    private ObjectMapper objectMapper;
     private final SeaTunnelRowType seaTunnelRowType;
     private final String fieldDelimiter;
     private final boolean enableDelete;
@@ -51,48 +52,29 @@ public class SeaTunnelRowSerializer extends 
SeaTunnelRowConverter implements Dor
         this.seaTunnelRowType = seaTunnelRowType;
         this.fieldDelimiter = fieldDelimiter;
         this.enableDelete = enableDelete;
-        if (JSON.equals(type)) {
-            objectMapper = new ObjectMapper();
-        }
     }
 
-    @Override
-    public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
-        String valString;
-        if (JSON.equals(type)) {
-            valString = buildJsonString(seaTunnelRow);
-        } else if (CSV.equals(type)) {
-            valString = buildCSVString(seaTunnelRow);
-        } else {
-            throw new IllegalArgumentException("The type " + type + " is not 
supported!");
-        }
-        return valString.getBytes(StandardCharsets.UTF_8);
+    public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType 
seaTunnelRowType)
+            throws IOException {
+
+        JsonSerializationSchema jsonSerializationSchema =
+                new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
+        ObjectMapper mapper = jsonSerializationSchema.getMapper();
+        mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, 
true);
+        return jsonSerializationSchema.serialize(row);
     }
 
-    public String buildJsonString(SeaTunnelRow row) throws IOException {
-        Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
+    public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType 
seaTunnelRowType)
+            throws IOException {
 
-        for (int i = 0; i < row.getFields().length; i++) {
-            Object value = convert(seaTunnelRowType.getFieldType(i), 
row.getField(i));
-            rowMap.put(seaTunnelRowType.getFieldName(i), value);
-        }
-        if (enableDelete) {
-            rowMap.put(LoadConstants.DORIS_DELETE_SIGN, 
parseDeleteSign(row.getRowKind()));
-        }
-        return objectMapper.writeValueAsString(rowMap);
-    }
+        TextSerializationSchema build =
+                TextSerializationSchema.builder()
+                        .seaTunnelRowType(seaTunnelRowType)
+                        .delimiter(fieldDelimiter)
+                        .nullValue(NULL_VALUE)
+                        .build();
 
-    public String buildCSVString(SeaTunnelRow row) throws IOException {
-        StringJoiner joiner = new StringJoiner(fieldDelimiter);
-        for (int i = 0; i < row.getFields().length; i++) {
-            Object field = convert(seaTunnelRowType.getFieldType(i), 
row.getField(i));
-            String value = field != null ? field.toString() : NULL_VALUE;
-            joiner.add(value);
-        }
-        if (enableDelete) {
-            joiner.add(parseDeleteSign(row.getRowKind()));
-        }
-        return joiner.toString();
+        return build.serialize(row);
     }
 
     public String parseDeleteSign(RowKind rowKind) {
@@ -105,46 +87,40 @@ public class SeaTunnelRowSerializer extends 
SeaTunnelRowConverter implements Dor
         }
     }
 
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    /** Builder for RowDataSerializer. */
-    public static class Builder {
-        private SeaTunnelRowType seaTunnelRowType;
-        private String type;
-        private String fieldDelimiter;
-        private boolean deletable;
-
-        public Builder setType(String type) {
-            this.type = type;
-            return this;
-        }
+    @Override
+    public void open() throws IOException {}
 
-        public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
-            this.seaTunnelRowType = seaTunnelRowType;
-            return this;
-        }
+    @Override
+    public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
 
-        public Builder setFieldDelimiter(String fieldDelimiter) {
-            this.fieldDelimiter = fieldDelimiter;
-            return this;
-        }
+        List<String> fieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
+        List<SeaTunnelDataType<?>> fieldTypes = 
Arrays.asList(seaTunnelRowType.getFieldTypes());
 
-        public Builder enableDelete(boolean deletable) {
-            this.deletable = deletable;
-            return this;
+        if (enableDelete) {
+            SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
+            seaTunnelRowEnableDelete.setField(
+                    seaTunnelRow.getFields().length, 
parseDeleteSign(seaTunnelRow.getRowKind()));
+            fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
+            fieldTypes.add(STRING_TYPE);
         }
 
-        public SeaTunnelRowSerializer build() {
-            checkState(CSV.equals(type) && fieldDelimiter != null || 
JSON.equals(type));
-            return new SeaTunnelRowSerializer(type, seaTunnelRowType, 
fieldDelimiter, deletable);
+        if (JSON.equals(type)) {
+            return buildJsonString(
+                    seaTunnelRow,
+                    new SeaTunnelRowType(
+                            fieldNames.toArray(new String[0]),
+                            fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+        } else if (CSV.equals(type)) {
+            return buildCSVString(
+                    seaTunnelRow,
+                    new SeaTunnelRowType(
+                            fieldNames.toArray(new String[0]),
+                            fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+        } else {
+            throw new IllegalArgumentException("The type " + type + " is not 
supported!");
         }
     }
 
-    @Override
-    public void open() throws IOException {}
-
     @Override
     public void close() throws IOException {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
index a569e2b285..930e83c568 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.doris.source.serialization;
 import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.BigIntVector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.BitVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.DateDayVector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.DecimalVector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
 import 
org.apache.seatunnel.shade.org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -27,6 +28,7 @@ import 
org.apache.seatunnel.shade.org.apache.arrow.vector.Float4Vector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.Float8Vector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.IntVector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.SmallIntVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.TinyIntVector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.VarCharVector;
 import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot;
@@ -46,6 +48,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
 
@@ -71,21 +75,21 @@ import java.util.function.IntFunction;
 
 @Slf4j
 public class RowBatch {
+    SeaTunnelDataType<?>[] fieldTypes;
+    private final ArrowStreamReader arrowStreamReader;
+    private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+    private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+    private final DateTimeFormatter dateTimeV2Formatter =
+            DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+    private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
     // offset for iterate the rowBatch
     private int offsetInRowBatch = 0;
     private int rowCountInOneBatch = 0;
     private int readRowCount = 0;
-    SeaTunnelDataType<?>[] fieldTypes;
     private List<SeaTunnelRow> seatunnelRowBatch = new ArrayList<>();
-    private final ArrowStreamReader arrowStreamReader;
     private VectorSchemaRoot root;
     private List<FieldVector> fieldVectors;
     private RootAllocator rootAllocator;
-    private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
-    private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
-    private final DateTimeFormatter dateTimeV2Formatter =
-            DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
-    private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
 
     public RowBatch(TScanBatchResult nextResult, SeaTunnelRowType 
seaTunnelRowType) {
         this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
@@ -293,6 +297,19 @@ public class RowBatch {
                                 return new BigDecimal(new BigInteger(bytes), 
0);
                             });
                     break;
+                } else if (fieldVector instanceof VarCharVector) {
+                    VarCharVector varCharVector = (VarCharVector) fieldVector;
+                    Preconditions.checkArgument(
+                            minorType.equals(Types.MinorType.VARCHAR),
+                            typeMismatchMessage(currentType, minorType));
+                    addValueToRowForAllRows(
+                            col,
+                            rowIndex ->
+                                    varCharVector.isNull(rowIndex)
+                                            ? null
+                                            : new BigDecimal(
+                                                    new 
String(varCharVector.get(rowIndex))));
+                    break;
                 }
                 DecimalVector decimalVector = (DecimalVector) fieldVector;
                 Preconditions.checkArgument(
@@ -307,6 +324,21 @@ public class RowBatch {
                 break;
             case "DATE":
             case "DATEV2":
+                if (fieldVector instanceof DateDayVector) {
+                    DateDayVector dateVector = (DateDayVector) fieldVector;
+                    Preconditions.checkArgument(
+                            minorType.equals(Types.MinorType.DATEDAY),
+                            typeMismatchMessage(currentType, minorType));
+                    addValueToRowForAllRows(
+                            col,
+                            rowIndex -> {
+                                if (dateVector.isNull(rowIndex)) {
+                                    return null;
+                                }
+                                return 
LocalDate.ofEpochDay(dateVector.get(rowIndex));
+                            });
+                    break;
+                }
                 VarCharVector dateVector = (VarCharVector) fieldVector;
                 Preconditions.checkArgument(
                         minorType.equals(Types.MinorType.VARCHAR),
@@ -322,6 +354,22 @@ public class RowBatch {
                         });
                 break;
             case "TIMESTAMP":
+                if (fieldVector instanceof TimeStampMicroVector) {
+                    TimeStampMicroVector timestampVector = 
(TimeStampMicroVector) fieldVector;
+
+                    addValueToRowForAllRows(
+                            col,
+                            rowIndex -> {
+                                if (timestampVector.isNull(rowIndex)) {
+                                    return null;
+                                }
+                                String stringValue = 
timestampVector.getObject(rowIndex).toString();
+                                stringValue = 
completeMilliseconds(stringValue);
+
+                                return DateTimeUtils.parse(stringValue);
+                            });
+                    break;
+                }
                 VarCharVector timestampVector = (VarCharVector) fieldVector;
                 Preconditions.checkArgument(
                         minorType.equals(Types.MinorType.VARCHAR),
@@ -499,6 +547,9 @@ public class RowBatch {
         }
 
         if (vectorObject instanceof Integer) {
+            if (sqlType.equals(SqlType.DATE)) {
+                return LocalDate.ofEpochDay((int) vectorObject);
+            }
             return Integer.valueOf(vectorObject.toString());
         }
 
@@ -520,6 +571,8 @@ public class RowBatch {
                 return LocalDateTime.parse(stringValue, dateTimeV2Formatter);
             } else if (sqlType.equals(SqlType.DATE)) {
                 return LocalDate.parse(vectorObject.toString(), dateFormatter);
+            } else if (sqlType.equals(SqlType.DECIMAL)) {
+                return new BigDecimal(vectorObject.toString());
             }
             return vectorObject.toString();
         }
@@ -540,6 +593,13 @@ public class RowBatch {
             }
             return new BigDecimal(new BigInteger(bytes), 0);
         }
+        if (vectorObject instanceof LocalDate) {
+            return DateUtils.parse(vectorObject.toString());
+        }
+
+        if (vectorObject instanceof LocalDateTime) {
+            return DateTimeUtils.parse(vectorObject.toString());
+        }
 
         return vectorObject.toString();
     }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java
deleted file mode 100644
index 5755beb3f7..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverterTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.serialize;
-
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDateTime;
-
-public class SeaTunnelRowConverterTest {
-
-    private static final SeaTunnelRowConverter seaTunnelRowConverter = new 
SeaTunnelRowConverter();
-
-    @Test
-    void testDateTimeWithNano() {
-        Assertions.assertEquals(
-                "2021-01-01 00:00:00.123456",
-                seaTunnelRowConverter.convert(
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                        LocalDateTime.of(2021, 1, 1, 0, 0, 0, 123456789)));
-        Assertions.assertEquals(
-                "2021-01-01 00:00:00.000000",
-                seaTunnelRowConverter.convert(
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                        LocalDateTime.of(2021, 1, 1, 0, 0, 0, 0)));
-        Assertions.assertEquals(
-                "2021-01-01 00:00:00.000001",
-                seaTunnelRowConverter.convert(
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                        LocalDateTime.of(2021, 1, 1, 0, 0, 0, 1000)));
-        Assertions.assertEquals(
-                "2021-01-01 00:00:00.000123",
-                seaTunnelRowConverter.convert(
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                        LocalDateTime.of(2021, 1, 1, 0, 0, 0, 123456)));
-    }
-}
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
index 4e2e98317b..b35710b3a0 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -59,6 +59,13 @@ public class JsonSerializationSchema implements 
SerializationSchema {
         this.charset = charset;
     }
 
+    public JsonSerializationSchema(SeaTunnelRowType rowType, String nullValue) 
{
+        this.rowType = rowType;
+        this.runtimeConverter =
+                new 
RowToJsonConverters().createConverter(checkNotNull(rowType), nullValue);
+        this.charset = StandardCharsets.UTF_8;
+    }
+
     @Override
     public byte[] serialize(SeaTunnelRow row) {
         if (node == null) {
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
index 575b5bace1..2cf8ae092e 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -49,15 +49,25 @@ public class RowToJsonConverters implements Serializable {
 
     private static final long serialVersionUID = 6988876688930916940L;
 
+    private String nullValue;
+
     public RowToJsonConverter createConverter(SeaTunnelDataType<?> type) {
         return wrapIntoNullableConverter(createNotNullConverter(type));
     }
 
+    public RowToJsonConverter createConverter(SeaTunnelDataType<?> type, 
String nullValue) {
+        this.nullValue = nullValue;
+        return createConverter(type);
+    }
+
     private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter 
converter) {
         return new RowToJsonConverter() {
             @Override
             public JsonNode convert(ObjectMapper mapper, JsonNode reuse, 
Object value) {
                 if (value == null) {
+                    if (nullValue != null) {
+                        return mapper.getNodeFactory().textNode(nullValue);
+                    }
                     return mapper.getNodeFactory().nullNode();
                 }
                 return converter.convert(mapper, reuse, value);
@@ -74,7 +84,9 @@ public class RowToJsonConverters implements Serializable {
                 return new RowToJsonConverter() {
                     @Override
                     public JsonNode convert(ObjectMapper mapper, JsonNode 
reuse, Object value) {
-                        return null;
+                        return nullValue == null
+                                ? null
+                                : mapper.getNodeFactory().textNode((String) 
value);
                     }
                 };
             case BOOLEAN:
@@ -175,8 +187,7 @@ public class RowToJsonConverters implements Serializable {
                 return createArrayConverter((ArrayType) type);
             case MAP:
                 MapType mapType = (MapType) type;
-                return createMapConverter(
-                        mapType.toString(), mapType.getKeyType(), 
mapType.getValueType());
+                return createMapConverter(mapType.getKeyType(), 
mapType.getValueType());
             default:
                 throw new SeaTunnelJsonFormatException(
                         CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
@@ -258,15 +269,10 @@ public class RowToJsonConverters implements Serializable {
     }
 
     private RowToJsonConverter createMapConverter(
-            String typeSummary, SeaTunnelDataType<?> keyType, 
SeaTunnelDataType<?> valueType) {
-        if (!SqlType.STRING.equals(keyType.getSqlType())) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                    "JSON format doesn't support non-string as key type of 
map. The type is: "
-                            + typeSummary);
-        }
-
+            SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
+        final RowToJsonConverter keyConverter = createConverter(keyType);
         final RowToJsonConverter valueConverter = createConverter(valueType);
+
         return new RowToJsonConverter() {
             @Override
             public JsonNode convert(ObjectMapper mapper, JsonNode reuse, 
Object value) {
@@ -280,9 +286,12 @@ public class RowToJsonConverters implements Serializable {
                     node.removeAll();
                 }
 
-                Map<String, ?> mapData = (Map) value;
-                for (Map.Entry<String, ?> entry : mapData.entrySet()) {
-                    String fieldName = entry.getKey();
+                Map<?, ?> mapData = (Map) value;
+                for (Map.Entry<?, ?> entry : mapData.entrySet()) {
+                    // Convert the key to a string using the key converter
+                    JsonNode keyNode = keyConverter.convert(mapper, null, 
entry.getKey());
+                    String fieldName = keyNode.isTextual() ? keyNode.asText() 
: keyNode.toString();
+
                     node.set(
                             fieldName,
                             valueConverter.convert(mapper, 
node.get(fieldName), entry.getValue()));
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index ff1bb82005..fb6fd9da76 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -601,4 +601,81 @@ public class JsonRowDataSerDeSchemaTest {
                 "ErrorCode:[COMMON-33], ErrorDescription:[The datetime format 
'2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check 
the datetime format.]",
                 exception2.getCause().getCause().getMessage());
     }
+
+    @Test
+    public void testSerializationWithNullValue() {
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(
+                        new String[] {
+                            "bool", "int", "longValue", "float", "name", 
"date", "time", "timestamp"
+                        },
+                        new SeaTunnelDataType[] {
+                            BOOLEAN_TYPE,
+                            INT_TYPE,
+                            LONG_TYPE,
+                            FLOAT_TYPE,
+                            STRING_TYPE,
+                            LocalTimeType.LOCAL_DATE_TYPE,
+                            LocalTimeType.LOCAL_TIME_TYPE,
+                            LocalTimeType.LOCAL_DATE_TIME_TYPE
+                        });
+
+        Object[] fields = new Object[] {null, null, null, null, null, null, 
null, null};
+        SeaTunnelRow expected = new SeaTunnelRow(fields);
+        assertEquals(
+                
"{\"bool\":\"\\\\N\",\"int\":\"\\\\N\",\"longValue\":\"\\\\N\",\"float\":\"\\\\N\",\"name\":\"\\\\N\",\"date\":\"\\\\N\",\"time\":\"\\\\N\",\"timestamp\":\"\\\\N\"}",
+                new String(new JsonSerializationSchema(schema, 
"\\N").serialize(expected)));
+    }
+
+    @Test
+    public void testSerializationWithMapHasNonStringKey() {
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(
+                        new String[] {"mapii", "mapbb"},
+                        new SeaTunnelDataType[] {
+                            new MapType(INT_TYPE, INT_TYPE), new 
MapType(BOOLEAN_TYPE, INT_TYPE)
+                        });
+        Map<Integer, Integer> mapII = new HashMap<>();
+        mapII.put(1, 2);
+
+        Map<Boolean, Integer> mapBI = new HashMap<>();
+        mapBI.put(true, 3);
+
+        Object[] fields = new Object[] {mapII, mapBI};
+        SeaTunnelRow expected = new SeaTunnelRow(fields);
+        assertEquals(
+                "{\"mapii\":{\"1\":2},\"mapbb\":{\"true\":3}}",
+                new String(new JsonSerializationSchema(schema, 
"\\N").serialize(expected)));
+    }
+
+    @Test
+    public void testSerializationWithTimestamp() {
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(
+                        new String[] {"timestamp"},
+                        new SeaTunnelDataType[] 
{LocalTimeType.LOCAL_DATE_TIME_TYPE});
+        LocalDateTime timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 
123456000);
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {timestamp});
+        assertEquals(
+                "{\"timestamp\":\"2022-09-24T22:45:00.123456\"}",
+                new String(new JsonSerializationSchema(schema, 
"\\N").serialize(row)));
+
+        timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 0);
+        row = new SeaTunnelRow(new Object[] {timestamp});
+        assertEquals(
+                "{\"timestamp\":\"2022-09-24T22:45:00\"}",
+                new String(new JsonSerializationSchema(schema, 
"\\N").serialize(row)));
+
+        timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 1000);
+        row = new SeaTunnelRow(new Object[] {timestamp});
+        assertEquals(
+                "{\"timestamp\":\"2022-09-24T22:45:00.000001\"}",
+                new String(new JsonSerializationSchema(schema, 
"\\N").serialize(row)));
+
+        timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456);
+        row = new SeaTunnelRow(new Object[] {timestamp});
+        assertEquals(
+                "{\"timestamp\":\"2022-09-24T22:45:00.000123456\"}",
+                new String(new JsonSerializationSchema(schema, 
"\\N").serialize(row)));
+    }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index 6f108ee295..01ca981a11 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -48,6 +48,7 @@ public class TextSerializationSchema implements 
SerializationSchema {
     private final DateTimeUtils.Formatter dateTimeFormatter;
     private final TimeUtils.Formatter timeFormatter;
     private final Charset charset;
+    private final String nullValue;
 
     private TextSerializationSchema(
             @NonNull SeaTunnelRowType seaTunnelRowType,
@@ -55,13 +56,15 @@ public class TextSerializationSchema implements 
SerializationSchema {
             DateUtils.Formatter dateFormatter,
             DateTimeUtils.Formatter dateTimeFormatter,
             TimeUtils.Formatter timeFormatter,
-            Charset charset) {
+            Charset charset,
+            String nullValue) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.separators = separators;
         this.dateFormatter = dateFormatter;
         this.dateTimeFormatter = dateTimeFormatter;
         this.timeFormatter = timeFormatter;
         this.charset = charset;
+        this.nullValue = nullValue;
     }
 
     public static Builder builder() {
@@ -76,6 +79,7 @@ public class TextSerializationSchema implements 
SerializationSchema {
                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
         private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
         private Charset charset = StandardCharsets.UTF_8;
+        private String nullValue = "";
 
         private Builder() {}
 
@@ -114,6 +118,11 @@ public class TextSerializationSchema implements 
SerializationSchema {
             return this;
         }
 
+        public Builder nullValue(String nullValue) {
+            this.nullValue = nullValue;
+            return this;
+        }
+
         public TextSerializationSchema build() {
             return new TextSerializationSchema(
                     seaTunnelRowType,
@@ -121,7 +130,8 @@ public class TextSerializationSchema implements 
SerializationSchema {
                     dateFormatter,
                     dateTimeFormatter,
                     timeFormatter,
-                    charset);
+                    charset,
+                    nullValue);
         }
     }
 
@@ -141,7 +151,7 @@ public class TextSerializationSchema implements 
SerializationSchema {
 
     private String convert(Object field, SeaTunnelDataType<?> fieldType, int 
level) {
         if (field == null) {
-            return "";
+            return nullValue;
         }
         switch (fieldType.getSqlType()) {
             case DOUBLE:
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
index 0f58e32f14..77c80a4bb8 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.DateTimeUtils.Formatter;
 import org.apache.seatunnel.format.text.splitor.CsvLineSplitor;
 
 import org.junit.jupiter.api.Assertions;
@@ -34,9 +35,12 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Map;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class CsvTextFormatSchemaTest {
     public String content =
             "\"mess,age\","
@@ -150,4 +154,40 @@ public class CsvTextFormatSchemaTest {
         Assertions.assertEquals(((Map<?, ?>) 
(seaTunnelRow.getField(15))).get("tyrantlucifer"), 18);
         Assertions.assertEquals(((Map<?, ?>) 
(seaTunnelRow.getField(15))).get("Kris"), 21);
     }
+
+    @Test
+    public void testSerializationWithTimestamp() {
+        String delimiter = ",";
+
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(
+                        new String[] {"timestamp"},
+                        new SeaTunnelDataType[] 
{LocalTimeType.LOCAL_DATE_TIME_TYPE});
+        LocalDateTime timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 
123456000);
+        TextSerializationSchema textSerializationSchema =
+                TextSerializationSchema.builder()
+                        .seaTunnelRowType(schema)
+                        
.dateTimeFormatter(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS)
+                        .delimiter(delimiter)
+                        .build();
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {timestamp});
+
+        assertEquals(
+                "2022-09-24 22:45:00.123456", new 
String(textSerializationSchema.serialize(row)));
+
+        timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 0);
+        row = new SeaTunnelRow(new Object[] {timestamp});
+        assertEquals(
+                "2022-09-24 22:45:00.000000", new 
String(textSerializationSchema.serialize(row)));
+
+        timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 1000);
+        row = new SeaTunnelRow(new Object[] {timestamp});
+        assertEquals(
+                "2022-09-24 22:45:00.000001", new 
String(textSerializationSchema.serialize(row)));
+
+        timestamp = LocalDateTime.of(2022, 9, 24, 22, 45, 0, 123456);
+        row = new SeaTunnelRow(new Object[] {timestamp});
+        assertEquals(
+                "2022-09-24 22:45:00.000123", new 
String(textSerializationSchema.serialize(row)));
+    }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
index 45574392d2..a8ab6decfa 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
@@ -36,6 +36,13 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class TextFormatSchemaTest {
     public String content =
             String.join("\u0002", Arrays.asList("1", "2", "3", "4", "5", "6"))
@@ -187,4 +194,38 @@ public class TextFormatSchemaTest {
                 "ErrorCode:[COMMON-33], ErrorDescription:[The datetime format 
'2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check 
the datetime format.]",
                 exception2.getMessage());
     }
+
+    @Test
+    public void testSerializationWithNullValue() throws Exception {
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(
+                        new String[] {
+                            "bool", "int", "longValue", "float", "name", 
"date", "time", "timestamp"
+                        },
+                        new SeaTunnelDataType[] {
+                            BOOLEAN_TYPE,
+                            INT_TYPE,
+                            LONG_TYPE,
+                            FLOAT_TYPE,
+                            STRING_TYPE,
+                            LocalTimeType.LOCAL_DATE_TYPE,
+                            LocalTimeType.LOCAL_TIME_TYPE,
+                            LocalTimeType.LOCAL_DATE_TIME_TYPE
+                        });
+
+        Object[] fields = new Object[] {null, null, null, null, null, null, 
null, null};
+        SeaTunnelRow expected = new SeaTunnelRow(fields);
+
+        TextSerializationSchema textSerializationSchema =
+                TextSerializationSchema.builder()
+                        .seaTunnelRowType(schema)
+                        .delimiter("\u0001")
+                        .nullValue("\\N")
+                        .build();
+
+        System.out.println(new 
String(textSerializationSchema.serialize(expected)));
+        assertEquals(
+                
"\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N\u0001\\N",
+                new String(textSerializationSchema.serialize(expected)));
+    }
 }


Reply via email to