This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f3963ba3ee [INLONG-9491][Sort] CSV format support ignore trailing 
unmappable fields (#9492)
f3963ba3ee is described below

commit f3963ba3eeeb2ad94782ea2f3a96ec0f7c9e4be2
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Jan 9 10:11:01 2024 +0800

    [INLONG-9491][Sort] CSV format support ignore trailing unmappable fields 
(#9492)
---
 .../sort-formats/format-inlongmsg-base/pom.xml     |   2 +-
 .../formats/inlongmsg/InLongMsgDecodingFormat.java |  64 ++++++++++-
 .../formats/inlongmsg/InLongMsgFormatFactory.java  |  12 +-
 .../sort/formats/inlongmsg/InLongMsgOptions.java   |  26 +++++
 .../inlongmsg/InLongMsgRowDataSerDeTest.java       | 123 +++++++++++++++++++++
 5 files changed, 220 insertions(+), 7 deletions(-)

diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml 
b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index 8a3f720721..8846aa0446 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -94,7 +94,7 @@
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-csv</artifactId>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
     </dependencies>
diff --git 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
index 0f67bbc072..93e970aa85 100644
--- 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++ 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -19,8 +19,17 @@ package org.apache.inlong.sort.formats.inlongmsg;
 
 import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
 
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
@@ -31,6 +40,7 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -40,6 +50,12 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_EMPTY_STRING_AS_NULL;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_TRAILING_UNMAPPABLE;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_INSERT_NULLS_FOR_MISSING_COLUMNS;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
+
+@Slf4j
 public class InLongMsgDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
 
     private final String innerFormatMetaPrefix;
@@ -50,14 +66,23 @@ public class InLongMsgDecodingFormat implements 
DecodingFormat<DeserializationSc
 
     private final boolean ignoreErrors;
 
+    private final boolean ignoreTrailingUnmappable;
+
+    private final boolean insertNullsForMissingColumns;
+
+    private final boolean emptyStringAsNull;
+
     public InLongMsgDecodingFormat(
             DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat,
             String innerFormatMetaPrefix,
-            boolean ignoreErrors) {
+            ReadableConfig formatOptions) {
         this.innerDecodingFormat = innerDecodingFormat;
         this.innerFormatMetaPrefix = innerFormatMetaPrefix;
         this.metadataKeys = Collections.emptyList();
-        this.ignoreErrors = ignoreErrors;
+        this.ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+        this.ignoreTrailingUnmappable = 
formatOptions.get(CSV_IGNORE_TRAILING_UNMAPPABLE);
+        this.insertNullsForMissingColumns = 
formatOptions.get(CSV_INSERT_NULLS_FOR_MISSING_COLUMNS);
+        this.emptyStringAsNull = formatOptions.get(CSV_EMPTY_STRING_AS_NULL);
     }
 
     @Override
@@ -83,8 +108,15 @@ public class InLongMsgDecodingFormat implements 
DecodingFormat<DeserializationSc
         final TypeInformation<RowData> producedTypeInfo =
                 context.createTypeInformation(producedDataType);
 
+        DeserializationSchema<RowData> innerSchema =
+                innerDecodingFormat.createRuntimeDecoder(context, 
physicalDataType);
+        if (innerSchema instanceof CsvRowDataDeserializationSchema) {
+            configCsvInnerFormat(innerSchema, ignoreTrailingUnmappable,
+                    insertNullsForMissingColumns, emptyStringAsNull);
+        }
+
         return new InLongMsgDeserializationSchema(
-                innerDecodingFormat.createRuntimeDecoder(context, 
physicalDataType),
+                innerSchema,
                 metadataConverters,
                 producedTypeInfo,
                 ignoreErrors);
@@ -190,4 +222,30 @@ public class InLongMsgDecodingFormat implements 
DecodingFormat<DeserializationSc
             this.converter = converter;
         }
     }
+
+    @VisibleForTesting
+    static void configCsvInnerFormat(
+            DeserializationSchema<RowData> innerSchema,
+            boolean ignoreTrailingUnmappable,
+            boolean insertNullsForMissingColumns,
+            boolean emptyStringAsNull) {
+        try {
+            Field readerField = 
CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader");
+            readerField.setAccessible(true);
+            ObjectReader oldReader = (ObjectReader) 
readerField.get(innerSchema);
+
+            Field schemaField = ObjectReader.class.getDeclaredField("_schema");
+            schemaField.setAccessible(true);
+            CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader);
+            ObjectReader newReader = new CsvMapper()
+                    .configure(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE, 
ignoreTrailingUnmappable)
+                    
.configure(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS, 
insertNullsForMissingColumns)
+                    .configure(CsvParser.Feature.EMPTY_STRING_AS_NULL, 
emptyStringAsNull)
+                    .readerFor(JsonNode.class)
+                    .with(oldSchema);
+            readerField.set(innerSchema, newReader);
+        } catch (Throwable t) {
+            log.error("failed to make csv inner format to ignore trailing 
unmappable, ex is ", t);
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
index c9b368a364..c7caa43290 100644
--- 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
@@ -34,6 +34,10 @@ import 
org.apache.flink.table.factories.SerializationFormatFactory;
 import java.util.HashSet;
 import java.util.Set;
 
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_EMPTY_STRING_AS_NULL;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_PARSE_ERRORS;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_TRAILING_UNMAPPABLE;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_INSERT_NULLS_FOR_MISSING_COLUMNS;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions;
@@ -64,9 +68,7 @@ public final class InLongMsgFormatFactory
         String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix;
         DecodingFormat<DeserializationSchema<RowData>> innerFormat =
                 innerFactory.createDecodingFormat(context, new 
DelegatingConfiguration(allOptions, innerFormatPrefix));
-        boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
-
-        return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, 
ignoreErrors);
+        return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, 
formatOptions);
     }
 
     @Override
@@ -91,6 +93,10 @@ public final class InLongMsgFormatFactory
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(IGNORE_PARSE_ERRORS);
+        options.add(CSV_IGNORE_TRAILING_UNMAPPABLE);
+        options.add(CSV_INSERT_NULLS_FOR_MISSING_COLUMNS);
+        options.add(CSV_EMPTY_STRING_AS_NULL);
+        options.add(CSV_IGNORE_PARSE_ERRORS);
         return options;
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
index 92a2ebc0b7..be0a9354fd 100644
--- 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
+++ 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
@@ -41,6 +41,32 @@ public class InLongMsgOptions {
                     .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
                             + "fields are set to null in case of errors");
 
+    public static final ConfigOption<Boolean> CSV_IGNORE_PARSE_ERRORS =
+            ConfigOptions.key("csv.ignore-parse-errors")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Allows the case that real size exceeds 
the expected size.\n "
+                            + "The extra column will be skipped");
+
+    public static final ConfigOption<Boolean> CSV_IGNORE_TRAILING_UNMAPPABLE =
+            ConfigOptions.key("csv.ignore-trailing-unmappable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Allows the case that real size exceeds 
the expected size.\n "
+                            + "The extra column will be skipped");
+
+    public static final ConfigOption<Boolean> 
CSV_INSERT_NULLS_FOR_MISSING_COLUMNS =
+            ConfigOptions.key("csv.insert-nulls-for-missing-columns")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("For missing columns, insert null.");
+
+    public static final ConfigOption<Boolean> CSV_EMPTY_STRING_AS_NULL =
+            ConfigOptions.key("csv.empty-string-as-null")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("if the string value is empty, make it as 
null");
+
     public static void validateDecodingFormatOptions(ReadableConfig config) {
         String innerFormat = config.get(INNER_FORMAT);
         if (innerFormat == null) {
diff --git 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
index 9f8ddbdaf8..64985985ab 100644
--- 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
+++ 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.formats.inlongmsg;
 
 import org.apache.inlong.common.msg.InLongMsg;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
@@ -34,6 +35,7 @@ import org.apache.flink.table.factories.utils.FactoryMocks;
 import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -90,6 +92,127 @@ public class InLongMsgRowDataSerDeTest {
         assertEquals(exceptedOutput, deData);
     }
 
+    @Test
+    public void testIgnoreTrailing() throws IOException {
+        // mock data
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+                "1,asdqw,heihei,2".getBytes(StandardCharsets.UTF_8));
+        byte[] input = inLongMsg.buildArray();
+        List<RowData> exceptedOutput = ImmutableList.of(
+                GenericRowData.of(1L, BinaryStringData.fromString("asdqw"), 
BinaryStringData.fromString("heihei")));
+        final Map<String, String> tableOptions =
+                InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+                    opts.put("inlong-msg.inner.format", "csv");
+                    opts.put("inlong-msg.csv.ignore-trailing-unmappable", 
"true");
+                });
+        ResolvedSchema schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("f1", DataTypes.STRING()),
+                Column.physical("f2", DataTypes.STRING()));
+
+        DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+                
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+        List<RowData> deData = new ArrayList<>();
+        ListCollector<RowData> out = new ListCollector<>(deData);
+
+        inLongMsgDeserializationSchema.deserialize(input, out);
+
+        assertEquals(exceptedOutput, deData);
+    }
+
+    @Test
+    public void testEmptyFieldValueAsNull() throws IOException {
+        // mock data
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+                "1,asdqw,,2".getBytes(StandardCharsets.UTF_8));
+        byte[] input = inLongMsg.buildArray();
+        List<RowData> exceptedOutput = ImmutableList.of(
+                GenericRowData.of(1L, BinaryStringData.fromString("asdqw"), 
null, 2L));
+        final Map<String, String> tableOptions =
+                InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+                    opts.put("inlong-msg.inner.format", "csv");
+                    opts.put("inlong-msg.csv.empty-string-as-null", "true");
+                });
+        ResolvedSchema schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("f1", DataTypes.STRING()),
+                Column.physical("f2", DataTypes.STRING()),
+                Column.physical("f3", DataTypes.BIGINT()));
+
+        DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+                
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+        List<RowData> deData = new ArrayList<>();
+        ListCollector<RowData> out = new ListCollector<>(deData);
+
+        inLongMsgDeserializationSchema.deserialize(input, out);
+
+        assertEquals(exceptedOutput, deData);
+    }
+
+    @Test
+    public void testWrongTypeFieldAsNull() throws IOException {
+        // mock data
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+                "adf,asdqw,zdf,2".getBytes(StandardCharsets.UTF_8));
+        byte[] input = inLongMsg.buildArray();
+        List<RowData> exceptedOutput = ImmutableList.of(
+                GenericRowData.of(null, BinaryStringData.fromString("asdqw"), 
BinaryStringData.fromString("zdf"), 2L));
+        final Map<String, String> tableOptions =
+                InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+                    opts.put("inlong-msg.inner.format", "csv");
+                    opts.put("inlong-msg.csv.ignore-parse-errors", "true");
+                });
+        ResolvedSchema schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("f1", DataTypes.STRING()),
+                Column.physical("f2", DataTypes.STRING()),
+                Column.physical("f3", DataTypes.BIGINT()));
+
+        DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+                
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+        List<RowData> deData = new ArrayList<>();
+        ListCollector<RowData> out = new ListCollector<>(deData);
+
+        inLongMsgDeserializationSchema.deserialize(input, out);
+
+        assertEquals(exceptedOutput, deData);
+    }
+
+    @Test
+    public void testInserNullForMissingColumn() throws IOException {
+        // mock data
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+        inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+                "1,asdqw,123,2".getBytes(StandardCharsets.UTF_8));
+        byte[] input = inLongMsg.buildArray();
+        List<RowData> exceptedOutput = ImmutableList.of(
+                GenericRowData.of(1L, BinaryStringData.fromString("asdqw"),
+                        BinaryStringData.fromString("123"), 2L, null));
+        final Map<String, String> tableOptions =
+                InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+                    opts.put("inlong-msg.inner.format", "csv");
+                    
opts.put("inlong-msg.csv.insert-nulls-for-missing-columns", "true");
+                });
+        ResolvedSchema schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("f1", DataTypes.STRING()),
+                Column.physical("f2", DataTypes.STRING()),
+                Column.physical("f3", DataTypes.BIGINT()),
+                Column.physical("f4", DataTypes.BIGINT()));
+
+        DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+                
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+        List<RowData> deData = new ArrayList<>();
+        ListCollector<RowData> out = new ListCollector<>(deData);
+
+        inLongMsgDeserializationSchema.deserialize(input, out);
+
+        assertEquals(exceptedOutput, deData);
+    }
+
     @Test
     public void testDeserializeInLongMsgWithError() throws Exception {
         // mock data

Reply via email to