This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f3963ba3ee [INLONG-9491][Sort] CSV format support ignore trailing unmappable fields (#9492) f3963ba3ee is described below commit f3963ba3eeeb2ad94782ea2f3a96ec0f7c9e4be2 Author: vernedeng <verned...@apache.org> AuthorDate: Tue Jan 9 10:11:01 2024 +0800 [INLONG-9491][Sort] CSV format support ignore trailing unmappable fields (#9492) --- .../sort-formats/format-inlongmsg-base/pom.xml | 2 +- .../formats/inlongmsg/InLongMsgDecodingFormat.java | 64 ++++++++++- .../formats/inlongmsg/InLongMsgFormatFactory.java | 12 +- .../sort/formats/inlongmsg/InLongMsgOptions.java | 26 +++++ .../inlongmsg/InLongMsgRowDataSerDeTest.java | 123 +++++++++++++++++++++ 5 files changed, 220 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml index 8a3f720721..8846aa0446 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml +++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml @@ -94,7 +94,7 @@ <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> - <scope>test</scope> + <scope>provided</scope> </dependency> </dependencies> diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java index 0f67bbc072..93e970aa85 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java @@ -19,8 +19,17 @@ package org.apache.inlong.sort.formats.inlongmsg; import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter; +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; @@ -31,6 +40,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -40,6 +50,12 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_EMPTY_STRING_AS_NULL; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_TRAILING_UNMAPPABLE; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_INSERT_NULLS_FOR_MISSING_COLUMNS; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS; + +@Slf4j public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { private final String innerFormatMetaPrefix; @@ -50,14 +66,23 @@ public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSc private final boolean ignoreErrors; + private final boolean ignoreTrailingUnmappable; + + private final boolean insertNullsForMissingColumns; + + private final boolean emptyStringAsNull; + public InLongMsgDecodingFormat( DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat, String innerFormatMetaPrefix, - boolean ignoreErrors) { + ReadableConfig formatOptions) { this.innerDecodingFormat = innerDecodingFormat; this.innerFormatMetaPrefix = innerFormatMetaPrefix; this.metadataKeys = Collections.emptyList(); - this.ignoreErrors = ignoreErrors; + this.ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + this.ignoreTrailingUnmappable = formatOptions.get(CSV_IGNORE_TRAILING_UNMAPPABLE); + this.insertNullsForMissingColumns = formatOptions.get(CSV_INSERT_NULLS_FOR_MISSING_COLUMNS); + this.emptyStringAsNull = formatOptions.get(CSV_EMPTY_STRING_AS_NULL); } @Override @@ -83,8 +108,15 @@ public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSc final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(producedDataType); + DeserializationSchema<RowData> innerSchema = + innerDecodingFormat.createRuntimeDecoder(context, physicalDataType); + if (innerSchema instanceof CsvRowDataDeserializationSchema) { + configCsvInnerFormat(innerSchema, ignoreTrailingUnmappable, + insertNullsForMissingColumns, emptyStringAsNull); + } + return new InLongMsgDeserializationSchema( - innerDecodingFormat.createRuntimeDecoder(context, physicalDataType), + innerSchema, metadataConverters, producedTypeInfo, ignoreErrors); @@ -190,4 +222,30 @@ public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSc this.converter = converter; } } + + @VisibleForTesting + static void configCsvInnerFormat( + DeserializationSchema<RowData> innerSchema, + boolean ignoreTrailingUnmappable, + boolean insertNullsForMissingColumns, + boolean emptyStringAsNull) { + try { + Field readerField = CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader"); + readerField.setAccessible(true); + ObjectReader oldReader = (ObjectReader) readerField.get(innerSchema); + + Field schemaField = ObjectReader.class.getDeclaredField("_schema"); + schemaField.setAccessible(true); + CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader); + ObjectReader newReader = new CsvMapper() + .configure(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE, ignoreTrailingUnmappable) + .configure(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS, insertNullsForMissingColumns) + .configure(CsvParser.Feature.EMPTY_STRING_AS_NULL, emptyStringAsNull) + .readerFor(JsonNode.class) + .with(oldSchema); + readerField.set(innerSchema, newReader); + } catch (Throwable t) { + log.error("failed to make csv inner format to ignore trailing unmappable, ex is ", t); + } + } } diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java index c9b368a364..c7caa43290 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java @@ -34,6 +34,10 @@ import org.apache.flink.table.factories.SerializationFormatFactory; import java.util.HashSet; import java.util.Set; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_EMPTY_STRING_AS_NULL; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_PARSE_ERRORS; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_TRAILING_UNMAPPABLE; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_INSERT_NULLS_FOR_MISSING_COLUMNS; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT; import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions; @@ -64,9 +68,7 @@ public final class InLongMsgFormatFactory String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix; DecodingFormat<DeserializationSchema<RowData>> innerFormat = innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix)); - boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS); - - return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors); + return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, formatOptions); } @Override @@ -91,6 +93,10 @@ public final class InLongMsgFormatFactory public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(IGNORE_PARSE_ERRORS); + options.add(CSV_IGNORE_TRAILING_UNMAPPABLE); + options.add(CSV_INSERT_NULLS_FOR_MISSING_COLUMNS); + options.add(CSV_EMPTY_STRING_AS_NULL); + options.add(CSV_IGNORE_PARSE_ERRORS); return options; } } diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java index 92a2ebc0b7..be0a9354fd 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java @@ -41,6 +41,32 @@ public class InLongMsgOptions { .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n" + "fields are set to null in case of errors"); + public static final ConfigOption<Boolean> CSV_IGNORE_PARSE_ERRORS = + ConfigOptions.key("csv.ignore-parse-errors") + .booleanType() + .defaultValue(false) + .withDescription("Allows the case that real size exceeds the expected size.\n " + + "The extra column will be skipped"); + + public static final ConfigOption<Boolean> CSV_IGNORE_TRAILING_UNMAPPABLE = + ConfigOptions.key("csv.ignore-trailing-unmappable") + .booleanType() + .defaultValue(false) + .withDescription("Allows the case that real size exceeds the expected size.\n " + + "The extra column will be skipped"); + + public static final ConfigOption<Boolean> CSV_INSERT_NULLS_FOR_MISSING_COLUMNS = + ConfigOptions.key("csv.insert-nulls-for-missing-columns") + .booleanType() + .defaultValue(false) + .withDescription("For missing columns, insert null."); + + public static final ConfigOption<Boolean> CSV_EMPTY_STRING_AS_NULL = + ConfigOptions.key("csv.empty-string-as-null") + .booleanType() + .defaultValue(false) + .withDescription("if the string value is empty, make it as null"); + public static void validateDecodingFormatOptions(ReadableConfig config) { String innerFormat = config.get(INNER_FORMAT); if (innerFormat == null) { diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java index 9f8ddbdaf8..64985985ab 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.formats.inlongmsg; import org.apache.inlong.common.msg.InLongMsg; +import com.google.common.collect.ImmutableList; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.Configuration; @@ -34,6 +35,7 @@ import org.apache.flink.table.factories.utils.FactoryMocks; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.junit.Test; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.ArrayList; @@ -90,6 +92,127 @@ public class InLongMsgRowDataSerDeTest { assertEquals(exceptedOutput, deData); } + @Test + public void testIgnoreTrailing() throws IOException { + // mock data + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + inLongMsg.addMsg("streamId=HAHA&t=202201011112", + "1,asdqw,heihei,2".getBytes(StandardCharsets.UTF_8)); + byte[] input = inLongMsg.buildArray(); + List<RowData> exceptedOutput = ImmutableList.of( + GenericRowData.of(1L, BinaryStringData.fromString("asdqw"), BinaryStringData.fromString("heihei"))); + final Map<String, String> tableOptions = + InLongMsgFormatFactoryTest.getModifiedOptions(opts -> { + opts.put("inlong-msg.inner.format", "csv"); + opts.put("inlong-msg.csv.ignore-trailing-unmappable", "true"); + }); + ResolvedSchema schema = ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("f1", DataTypes.STRING()), + Column.physical("f2", DataTypes.STRING())); + + DeserializationSchema<RowData> inLongMsgDeserializationSchema = + InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema); + List<RowData> deData = new ArrayList<>(); + ListCollector<RowData> out = new ListCollector<>(deData); + + inLongMsgDeserializationSchema.deserialize(input, out); + + assertEquals(exceptedOutput, deData); + } + + @Test + public void testEmptyFieldValueAsNull() throws IOException { + // mock data + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + inLongMsg.addMsg("streamId=HAHA&t=202201011112", + "1,asdqw,,2".getBytes(StandardCharsets.UTF_8)); + byte[] input = inLongMsg.buildArray(); + List<RowData> exceptedOutput = ImmutableList.of( + GenericRowData.of(1L, BinaryStringData.fromString("asdqw"), null, 2L)); + final Map<String, String> tableOptions = + InLongMsgFormatFactoryTest.getModifiedOptions(opts -> { + opts.put("inlong-msg.inner.format", "csv"); + opts.put("inlong-msg.csv.empty-string-as-null", "true"); + }); + ResolvedSchema schema = ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("f1", DataTypes.STRING()), + Column.physical("f2", DataTypes.STRING()), + Column.physical("f3", DataTypes.BIGINT())); + + DeserializationSchema<RowData> inLongMsgDeserializationSchema = + InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema); + List<RowData> deData = new ArrayList<>(); + ListCollector<RowData> out = new ListCollector<>(deData); + + inLongMsgDeserializationSchema.deserialize(input, out); + + assertEquals(exceptedOutput, deData); + } + + @Test + public void testWrongTypeFieldAsNull() throws IOException { + // mock data + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + inLongMsg.addMsg("streamId=HAHA&t=202201011112", + "adf,asdqw,zdf,2".getBytes(StandardCharsets.UTF_8)); + byte[] input = inLongMsg.buildArray(); + List<RowData> exceptedOutput = ImmutableList.of( + GenericRowData.of(null, BinaryStringData.fromString("asdqw"), BinaryStringData.fromString("zdf"), 2L)); + final Map<String, String> tableOptions = + InLongMsgFormatFactoryTest.getModifiedOptions(opts -> { + opts.put("inlong-msg.inner.format", "csv"); + opts.put("inlong-msg.csv.ignore-parse-errors", "true"); + }); + ResolvedSchema schema = ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("f1", DataTypes.STRING()), + Column.physical("f2", DataTypes.STRING()), + Column.physical("f3", DataTypes.BIGINT())); + + DeserializationSchema<RowData> inLongMsgDeserializationSchema = + InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema); + List<RowData> deData = new ArrayList<>(); + ListCollector<RowData> out = new ListCollector<>(deData); + + inLongMsgDeserializationSchema.deserialize(input, out); + + assertEquals(exceptedOutput, deData); + } + + @Test + public void testInserNullForMissingColumn() throws IOException { + // mock data + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + inLongMsg.addMsg("streamId=HAHA&t=202201011112", + "1,asdqw,123,2".getBytes(StandardCharsets.UTF_8)); + byte[] input = inLongMsg.buildArray(); + List<RowData> exceptedOutput = ImmutableList.of( + GenericRowData.of(1L, BinaryStringData.fromString("asdqw"), + BinaryStringData.fromString("123"), 2L, null)); + final Map<String, String> tableOptions = + InLongMsgFormatFactoryTest.getModifiedOptions(opts -> { + opts.put("inlong-msg.inner.format", "csv"); + opts.put("inlong-msg.csv.insert-nulls-for-missing-columns", "true"); + }); + ResolvedSchema schema = ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("f1", DataTypes.STRING()), + Column.physical("f2", DataTypes.STRING()), + Column.physical("f3", DataTypes.BIGINT()), + Column.physical("f4", DataTypes.BIGINT())); + + DeserializationSchema<RowData> inLongMsgDeserializationSchema = + InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema); + List<RowData> deData = new ArrayList<>(); + ListCollector<RowData> out = new ListCollector<>(deData); + + inLongMsgDeserializationSchema.deserialize(input, out); + + assertEquals(exceptedOutput, deData); + } + @Test public void testDeserializeInLongMsgWithError() throws Exception { // mock data