This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f0cff8a9cf [INLONG-9563][Sort] Support rowdata way of sort message kv format (#9630) f0cff8a9cf is described below commit f0cff8a9cfae433071a37d45f9b9d4feb1ff9e1e Author: baomingyu <baomingy...@163.com> AuthorDate: Sun Feb 4 21:04:26 2024 +0800 [INLONG-9563][Sort] Support rowdata way of sort message kv format (#9630) Co-authored-by: Charles Zhang <dockerzh...@apache.org> --- .../sort/formats/csv/CsvFormatFactoryTest.java | 10 +- .../format-rowdata/format-rowdata-kv/pom.xml | 142 +++++++++ .../apache/inlong/sort/formats/kv/KvCommons.java | 67 +++++ .../inlong/sort/formats/kv/KvFormatBuilder.java | 66 +++++ .../inlong/sort/formats/kv/KvFormatFactory.java | 216 ++++++++++++++ .../formats/kv/KvRowDataDeserializationSchema.java | 245 ++++++++++++++++ .../formats/kv/KvRowDataSerializationSchema.java | 237 +++++++++++++++ .../org.apache.flink.table.factories.Factory | 16 + .../sort/formats/kv/KvFormatFactoryTest.java | 150 ++++++++++ .../kv/KvRowDataDeserializationSchemaTest.java | 322 +++++++++++++++++++++ .../kv/KvRowDataSerializationSchemaTest.java | 290 +++++++++++++++++++ .../apache/inlong/sort/formats/kv/KvUtilsTest.java | 275 ++++++++++++++++++ .../src/test/resources/log4j-test.properties | 27 ++ inlong-sort/sort-formats/format-rowdata/pom.xml | 2 + 14 files changed, 2060 insertions(+), 5 deletions(-) diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java index 666340fdc9..8ce9024c2a 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java @@ -115,11 +115,11 @@ public class CsvFormatFactoryTest extends TestLogger { options.put("buffer-size", "1000"); options.put("format", CsvFormatFactory.IDENTIFIER); - options.put("InLong-CSV.row.format.info", FormatUtils.marshall(testFormatInfo)); - options.put("InLong-CSV.format.field-delimiter", ";"); - options.put("InLong-CSV.format.quote-character", "'"); - options.put("InLong-CSV.format.escape-character", "\\"); - options.put("InLong-CSV.format.null-literal", "n/a"); + options.put("inlong-csv.row.format.info", FormatUtils.marshall(testFormatInfo)); + options.put("inlong-csv.format.field-delimiter", ";"); + options.put("inlong-csv.format.quote-character", "'"); + options.put("inlong-csv.format.escape-character", "\\"); + options.put("inlong-csv.format.null-literal", "n/a"); return options; } diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml new file mode 100644 index 0000000000..3254fe23a7 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml @@ -0,0 +1,142 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>format-rowdata</artifactId> + <version>1.11.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-format-rowdata-kv</artifactId> + <packaging>jar</packaging> + <name>Apache InLong - Sort Format-RowData-KV</name> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + + <!-- core dependencies --> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-format-rowdata-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + <version>${flink.jackson.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <profiles> + <profile> + <id>japicmp-report</id> + <activation> + <property> + <name>japicmp-report</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>com.github.siom79.japicmp</groupId> + <artifactId>japicmp-maven-plugin</artifactId> + <configuration> + <parameter> + <breakBuildOnBinaryIncompatibleModifications>false</breakBuildOnBinaryIncompatibleModifications> + <breakBuildOnSourceIncompatibleModifications>false</breakBuildOnSourceIncompatibleModifications> + </parameter> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>japicmp-check</id> + <activation> + <property> + <name>!japicmp-report</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>com.github.siom79.japicmp</groupId> + <artifactId>japicmp-maven-plugin</artifactId> + <configuration> + <parameter> + <excludes> + <exclude>org.apache.inlong.sort.formats.kv.KvRowDataDeserializationSchema</exclude> + <exclude>org.apache.inlong.sort.formats.kv.KvRowDataSerializationSchema</exclude> + </excludes> + </parameter> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvCommons.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvCommons.java new file mode 100644 index 0000000000..1ef60e5691 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvCommons.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.base.TextFormatOptions; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; + +/** + * Commons values and methods for KV format. + */ +public class KvCommons { + + static void validateFormatOptions(ReadableConfig tableOptions) { + + validateCharacterVal(tableOptions, TextFormatOptions.FIELD_DELIMITER, true); + validateCharacterVal(tableOptions, TextFormatOptions.KV_DELIMITER, true); + validateCharacterVal(tableOptions, TextFormatOptions.KV_ENTRY_DELIMITER, true); + validateCharacterVal(tableOptions, TextFormatOptions.QUOTE_CHARACTER); + validateCharacterVal(tableOptions, TextFormatOptions.ESCAPE_CHARACTER); + } + + private static void validateCharacterVal( + ReadableConfig tableOptions, + ConfigOption<String> option) { + validateCharacterVal(tableOptions, option, false); + } + + private static void validateCharacterVal( + ReadableConfig tableOptions, + ConfigOption<String> option, + boolean unescape) { + if (!tableOptions.getOptional(option).isPresent()) { + return; + } + + final String value = unescape + ? StringEscapeUtils.unescapeJava(tableOptions.get(option)) + : tableOptions.get(option); + + if (value.length() != 1) { + throw new ValidationException( + String.format( + "Option '%s.%s' must be a String with single character, but was: %s", + KvFormatFactory.IDENTIFIER, option.key(), tableOptions.get(option))); + } + } + +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatBuilder.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatBuilder.java new file mode 100644 index 0000000000..7f42e43b6a --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatBuilder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.base.TextFormatBuilder; +import org.apache.inlong.sort.formats.common.RowFormatInfo; + +import org.apache.flink.table.descriptors.DescriptorProperties; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_ENTRY_DELIMITER; + +/** + * The builder for the serializers and deserializers for kv formats. + */ +public abstract class KvFormatBuilder<T extends KvFormatBuilder> extends TextFormatBuilder<T> { + + protected char entryDelimiter = DEFAULT_ENTRY_DELIMITER; + protected char kvDelimiter = DEFAULT_KV_DELIMITER; + + public KvFormatBuilder(RowFormatInfo rowFormatInfo) { + super(rowFormatInfo); + } + + @SuppressWarnings("unchecked") + public T setEntryDelimiter(char entryDelimiter) { + this.entryDelimiter = entryDelimiter; + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setKvDelimiter(char kvDelimiter) { + this.kvDelimiter = kvDelimiter; + return (T) this; + } + + @SuppressWarnings("unchecked") + @Override + public T configure(DescriptorProperties descriptorProperties) { + super.configure(descriptorProperties); + + descriptorProperties.getOptionalCharacter(FORMAT_KV_ENTRY_DELIMITER) + .ifPresent(this::setEntryDelimiter); + descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER) + .ifPresent(this::setKvDelimiter); + + return (T) this; + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java new file mode 100644 index 0000000000..dff4adff04 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils; +import org.apache.inlong.sort.formats.base.TableFormatOptions; +import org.apache.inlong.sort.formats.base.TextFormatOptions; +import org.apache.inlong.sort.formats.common.RowFormatInfo; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.inlong.sort.formats.base.TableFormatOptions.IGNORE_ERRORS; +import static org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO; +import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo; +import static org.apache.inlong.sort.formats.base.TextFormatOptions.ESCAPE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TextFormatOptions.NULL_LITERAL; +import static org.apache.inlong.sort.formats.base.TextFormatOptions.QUOTE_CHARACTER; + +/** + * Table format factory for providing configured instances of KV-to-RowData serializer and + * deserializer. + */ +public class KvFormatFactory + implements + SerializationFormatFactory, + DeserializationFormatFactory { + + public static final String IDENTIFIER = "inlong-kv"; + public static final String KV_PREFIX = IDENTIFIER + "."; + + @Override + public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat( + DynamicTableFactory.Context context, + ReadableConfig formatOptions) { + + FactoryUtil.validateFactoryOptions(this, formatOptions); + KvCommons.validateFormatOptions(formatOptions); + return new EncodingFormat<SerializationSchema<RowData>>() { + + @Override + public SerializationSchema<RowData> createRuntimeEncoder( + DynamicTableSink.Context context, + DataType dataType) { + + final RowFormatInfo projectedRowFormatInfo = TableFormatForRowDataUtils.projectRowFormatInfo( + deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)), + dataType); + KvRowDataSerializationSchema.Builder schemaBuilder = + new KvRowDataSerializationSchema.Builder(projectedRowFormatInfo); + configureSerializationSchema(formatOptions, schemaBuilder); + return schemaBuilder.build(); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( + DynamicTableFactory.Context context, + ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + KvCommons.validateFormatOptions(formatOptions); + + return new DecodingFormat<DeserializationSchema<RowData>>() { + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, + DataType dataType) { + + KvRowDataDeserializationSchema.Builder schemaBuilder = + new KvRowDataDeserializationSchema.Builder( + deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)), + context.createTypeInformation(dataType)); + configureDeserializationSchema(formatOptions, schemaBuilder); + return schemaBuilder.build(); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(ROW_FORMAT_INFO); + options.add(TextFormatOptions.KV_ENTRY_DELIMITER); + options.add(TextFormatOptions.KV_DELIMITER); + options.add(TextFormatOptions.CHARSET); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(TextFormatOptions.ESCAPE_CHARACTER); + options.add(TextFormatOptions.QUOTE_CHARACTER); + options.add(TextFormatOptions.NULL_LITERAL); + options.add(TableFormatOptions.IGNORE_ERRORS); + return options; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static void configureSerializationSchema( + ReadableConfig formatOptions, + KvRowDataSerializationSchema.Builder schemaBuilder) { + + formatOptions.getOptional(TextFormatOptions.CHARSET) + .ifPresent(schemaBuilder::setCharset); + + formatOptions.getOptional(TextFormatOptions.KV_ENTRY_DELIMITER) + .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0)) + .ifPresent(schemaBuilder::setEntryDelimiter); + + formatOptions.getOptional(TextFormatOptions.KV_DELIMITER) + .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0)) + .ifPresent(schemaBuilder::setKvDelimiter); + + formatOptions + .getOptional(ESCAPE_CHARACTER) + .map(escape -> escape.charAt(0)) + .ifPresent(schemaBuilder::setEscapeCharacter); + + formatOptions + .getOptional(QUOTE_CHARACTER) + .map(quote -> quote.charAt(0)) + .ifPresent(schemaBuilder::setQuoteCharacter); + + formatOptions.getOptional(NULL_LITERAL) + .ifPresent(schemaBuilder::setNullLiteral); + + formatOptions.getOptional(IGNORE_ERRORS) + .ifPresent(schemaBuilder::setIgnoreErrors); + + } + + private static void configureDeserializationSchema( + ReadableConfig formatOptions, + KvRowDataDeserializationSchema.Builder schemaBuilder) { + + formatOptions.getOptional(TextFormatOptions.CHARSET) + .ifPresent(schemaBuilder::setCharset); + + formatOptions.getOptional(TextFormatOptions.KV_ENTRY_DELIMITER) + .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0)) + .ifPresent(schemaBuilder::setEntryDelimiter); + + formatOptions.getOptional(TextFormatOptions.KV_DELIMITER) + .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0)) + .ifPresent(schemaBuilder::setKvDelimiter); + + formatOptions + .getOptional(ESCAPE_CHARACTER) + .map(escape -> escape.charAt(0)) + .ifPresent(schemaBuilder::setEscapeCharacter); + + formatOptions + .getOptional(QUOTE_CHARACTER) + .map(quote -> quote.charAt(0)) + .ifPresent(schemaBuilder::setQuoteCharacter); + + formatOptions.getOptional(NULL_LITERAL) + .ifPresent(schemaBuilder::setNullLiteral); + + formatOptions.getOptional(IGNORE_ERRORS) + .ifPresent(schemaBuilder::setIgnoreErrors); + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java new file mode 100644 index 0000000000..d0bb4f1735 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema; +import org.apache.inlong.sort.formats.base.FieldToRowDataConverters; +import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; +import static org.apache.inlong.sort.formats.util.StringUtils.splitKv; + +/** + * Deserialization schema from KV string bytes to Flink Table & SQL internal data structures. + * + * <p>Deserializes a <code>byte[]</code> messages as a Map and converts it to a {@link RowData}.</p> + * + * <p>Failure during deserialization are forwarded as wrapped {@link IOException}.</p> + */ +public class KvRowDataDeserializationSchema extends DefaultDeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + @Nonnull + private final RowFormatInfo rowFormatInfo; + + @Nonnull + private final TypeInformation<RowData> producedTypeInfo; + + @Nonnull + private final String charset; + + @Nonnull + private final Character entryDelimiter; + + @Nonnull + private final Character kvDelimiter; + + @Nullable + private final Character escapeChar; + + @Nullable + private final Character quoteChar; + + @Nullable + private final String nullLiteral; + + private final FieldToRowDataConverters.FieldToRowDataConverter[] converters; + + public KvRowDataDeserializationSchema( + @Nonnull RowFormatInfo rowFormatInfo, + @Nonnull TypeInformation<RowData> producedTypeInfo) { + this( + rowFormatInfo, + producedTypeInfo, + DEFAULT_CHARSET, + DEFAULT_ENTRY_DELIMITER, + DEFAULT_KV_DELIMITER, + DEFAULT_ESCAPE_CHARACTER, + DEFAULT_QUOTE_CHARACTER, + DEFAULT_NULL_LITERAL); + } + + public KvRowDataDeserializationSchema( + @Nonnull RowFormatInfo rowFormatInfo, + @Nonnull TypeInformation<RowData> producedTypeInfo, + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral) { + this( + rowFormatInfo, + producedTypeInfo, + charset, + entryDelimiter, + kvDelimiter, + escapeChar, + quoteChar, + nullLiteral, + DEFAULT_IGNORE_ERRORS); + } + + public KvRowDataDeserializationSchema( + @Nonnull RowFormatInfo rowFormatInfo, + @Nonnull TypeInformation<RowData> producedTypeInfo, + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral, + @Nullable boolean ignoreErrors) { + super(ignoreErrors); + this.rowFormatInfo = rowFormatInfo; + this.producedTypeInfo = producedTypeInfo; + this.charset = charset; + this.entryDelimiter = entryDelimiter; + this.kvDelimiter = kvDelimiter; + this.escapeChar = escapeChar; + this.quoteChar = quoteChar; + this.nullLiteral = nullLiteral; + + converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos()) + .map(formatInfo -> FieldToRowDataConverters.createConverter( + TableFormatForRowDataUtils.deriveLogicalType(formatInfo))) + .toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new); + } + + @Override + public RowData deserializeInternal(byte[] bytes) throws IOException { + String text = new String(bytes, Charset.forName(charset)); + try { + Map<String, String> fieldTexts = + splitKv(text, entryDelimiter, kvDelimiter, escapeChar, quoteChar); + + String[] fieldNames = rowFormatInfo.getFieldNames(); + FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos(); + + GenericRowData rowData = new GenericRowData(fieldFormatInfos.length); + for (int i = 0; i < fieldFormatInfos.length; i++) { + String fieldName = fieldNames[i]; + FormatInfo fieldFormatInfo = fieldFormatInfos[i]; + + String fieldText = fieldTexts.get(fieldName); + + Object field = deserializeBasicField( + fieldName, + fieldFormatInfo, + fieldText, + nullLiteral); + rowData.setField(i, converters[i].convert(field)); + } + return rowData; + } catch (Throwable t) { + throw new IOException( + String.format("Could not properly deserialize kv. Text=[%s].", text), t); + } + } + + @Override + public boolean isEndOfStream(RowData rowData) { + return false; + } + + @Override + public TypeInformation<RowData> getProducedType() { + return producedTypeInfo; + } + + /** + * Builder for {@link KvRowDataDeserializationSchema}. + */ + public static class Builder extends KvFormatBuilder<Builder> { + + private final TypeInformation<RowData> producedTypeInfo; + public Builder(RowFormatInfo rowFormatInfo, TypeInformation<RowData> producedTypeInfo) { + super(rowFormatInfo); + this.producedTypeInfo = producedTypeInfo; + } + + public KvRowDataDeserializationSchema build() { + return new KvRowDataDeserializationSchema( + rowFormatInfo, + producedTypeInfo, + charset, + entryDelimiter, + kvDelimiter, + escapeChar, + quoteChar, + nullLiteral); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + KvRowDataDeserializationSchema that = (KvRowDataDeserializationSchema) o; + return rowFormatInfo.equals(that.rowFormatInfo) && + charset.equals(that.charset) && + entryDelimiter.equals(that.entryDelimiter) && + kvDelimiter.equals(that.kvDelimiter) && + Objects.equals(escapeChar, that.escapeChar) && + Objects.equals(quoteChar, that.quoteChar) && + Objects.equals(nullLiteral, that.nullLiteral); + } + + @Override + public int hashCode() { + return Objects.hash( + super.hashCode(), + rowFormatInfo, + charset, + entryDelimiter, + kvDelimiter, + escapeChar, + quoteChar, + nullLiteral); + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchema.java new file mode 100644 index 0000000000..2b684bbe7f --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchema.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.base.DefaultSerializationSchema; +import org.apache.inlong.sort.formats.base.RowDataToFieldConverters; +import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; + +import org.apache.flink.table.data.RowData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.nio.charset.Charset; +import java.util.Objects; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField; +import static org.apache.inlong.sort.formats.util.StringUtils.concatKv; + +/** + * Serialization schema that serializes an object of Flink Table & SQL internal data structure into + * a KV format bytes. + * + * <p> Serialize the input row into a string in a form of "k1=v1&k2=v2" and converts it into + * bytes.</p> + * + * <p>Result <code>byte[]</code> messages can be deserialized using + * {@link KvRowDataDeserializationSchema}.</p> + */ +public class KvRowDataSerializationSchema extends DefaultSerializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(KvRowDataSerializationSchema.class); + + @Nonnull + private final RowFormatInfo rowFormatInfo; + + @Nonnull + private final String charset; + + @Nonnull + private final Character entryDelimiter; + + @Nonnull + private final Character kvDelimiter; + + @Nullable + private final Character escapeChar; + + @Nullable + private final Character quoteChar; + + @Nullable + private final String nullLiteral; + + private final RowDataToFieldConverters.RowFieldConverter[] rowFieldConverters; + + public KvRowDataSerializationSchema( + @Nonnull RowFormatInfo rowFormatInfo) { + this( + rowFormatInfo, + DEFAULT_CHARSET, + DEFAULT_ENTRY_DELIMITER, + DEFAULT_KV_DELIMITER, + DEFAULT_ESCAPE_CHARACTER, + DEFAULT_QUOTE_CHARACTER, + DEFAULT_NULL_LITERAL); + } + + public KvRowDataSerializationSchema( + @Nonnull RowFormatInfo rowFormatInfo, + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral) { + this(rowFormatInfo, charset, entryDelimiter, kvDelimiter, escapeChar, quoteChar, nullLiteral, + DEFAULT_IGNORE_ERRORS); + } + + public KvRowDataSerializationSchema( + @Nonnull RowFormatInfo rowFormatInfo, + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral, + @Nullable boolean ignoreErrors) { + super(ignoreErrors); + this.rowFormatInfo = rowFormatInfo; + FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos(); + + rowFieldConverters = new RowDataToFieldConverters.RowFieldConverter[fieldFormatInfos.length]; + for (int i = 0; i < rowFieldConverters.length; i++) { + rowFieldConverters[i] = RowDataToFieldConverters.createNullableRowFieldConverter( + TableFormatForRowDataUtils.deriveLogicalType(fieldFormatInfos[i])); + } + + this.charset = charset; + this.entryDelimiter = entryDelimiter; + this.kvDelimiter = kvDelimiter; + this.escapeChar = escapeChar; + this.quoteChar = quoteChar; + this.nullLiteral = nullLiteral; + } + + @Override + public byte[] serializeInternal(RowData rowData) { + if (rowData == null) { + return null; + } + + try { + String[] fieldNames = rowFormatInfo.getFieldNames(); + FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos(); + + if (rowData.getArity() != fieldFormatInfos.length) { + LOG.warn("The number of fields mismatches: expected=[{}], actual=[{}]. Row=[{}].", + fieldNames.length, rowData.getArity(), rowData); + } + + String[] fieldTexts = new String[fieldNames.length]; + + // the extra fields will be dropped + for (int i = 0; i < fieldNames.length; i++) { + if (i >= rowData.getArity()) { + fieldTexts[i] = nullLiteral == null ? "" : nullLiteral; + } else { + Object field = rowFieldConverters[i].convert(rowData, i); + String fieldText = serializeBasicField( + fieldNames[i], + fieldFormatInfos[i], + field, + nullLiteral); + + fieldTexts[i] = fieldText; + } + } + + String text = concatKv( + fieldNames, + fieldTexts, + entryDelimiter, + kvDelimiter, + escapeChar, + quoteChar); + + return text.getBytes(Charset.forName(charset)); + } catch (Throwable t) { + throw new RuntimeException( + String.format("Could not properly serialize kv. Row=[%s]. FormatInfo=[%s].", + rowData, rowFormatInfo), + t); + } + } + + /** + * Builder for {@link KvRowDataSerializationSchema}. + */ + public static class Builder extends KvFormatBuilder<Builder> { + + public Builder(RowFormatInfo rowFormatInfo) { + super(rowFormatInfo); + } + + public KvRowDataSerializationSchema build() { + return new KvRowDataSerializationSchema( + rowFormatInfo, + charset, + entryDelimiter, + kvDelimiter, + escapeChar, + quoteChar, + nullLiteral, + ignoreErrors); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + if (!super.equals(o)) { + return false; + } + + KvRowDataSerializationSchema that = (KvRowDataSerializationSchema) o; + return Objects.equals(rowFormatInfo, that.rowFormatInfo) && + Objects.equals(charset, that.charset) && + Objects.equals(entryDelimiter, that.entryDelimiter) && + Objects.equals(kvDelimiter, that.kvDelimiter) && + Objects.equals(escapeChar, that.escapeChar) && + Objects.equals(quoteChar, that.quoteChar) && + Objects.equals(nullLiteral, that.nullLiteral); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), rowFormatInfo, charset, entryDelimiter, kvDelimiter, + escapeChar, quoteChar, nullLiteral); + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..b72e47ef5f --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.formats.kv.KvFormatFactory diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvFormatFactoryTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvFormatFactoryTest.java new file mode 100644 index 0000000000..13bc3a2c5f --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvFormatFactoryTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.base.TableFormatConstants; +import org.apache.inlong.sort.formats.base.TableFormatOptions; +import org.apache.inlong.sort.formats.base.TableFormatUtils; +import org.apache.inlong.sort.formats.common.BooleanFormatInfo; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.FormatUtils; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.TestDynamicTableFactory; +import org.apache.flink.table.factories.utils.FactoryMocks; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE; +import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; +import static org.junit.Assert.assertEquals; + +/** Tests for the {@link KvFormatFactory}. */ +public class KvFormatFactoryTest { + + private static final RowFormatInfo TEST_FORMAT_SCHEMA = + new RowFormatInfo( + new String[]{"a", "b", "c"}, + new FormatInfo[]{ + StringFormatInfo.INSTANCE, + IntFormatInfo.INSTANCE, + BooleanFormatInfo.INSTANCE + }); + + @Test + public void testSerDeSchema() { + final Map<String, String> tableOptions = getAllOptions(); + testSerializationSchema(tableOptions); + testDeserializationSchema(tableOptions); + } + + private Map<String, String> getModifiedOptions( + Consumer<Map<String, String>> optionModifier) { + Map<String, String> options = getAllOptions(); + optionModifier.accept(options); + return options; + } + + private void testDeserializationSchema(Map<String, String> tableOptions) { + LogicalType rowType = TableFormatUtils.deriveLogicalType(TEST_FORMAT_SCHEMA); + TypeInformation<RowData> producedType = InternalTypeInfo.of(rowType); + final KvRowDataDeserializationSchema expectedDeser = + new KvRowDataDeserializationSchema.Builder(TEST_FORMAT_SCHEMA, producedType) + .setEntryDelimiter('&') + .setKvDelimiter('=') + .setCharset(StandardCharsets.ISO_8859_1.name()) + .setEscapeCharacter('\\') + .setQuoteCharacter('\"') + .setNullLiteral("n/a") + .setIgnoreErrors(true) + .build(); + + DeserializationSchema<RowData> actualDeser = createDeserializationSchema(tableOptions); + assertEquals(expectedDeser, actualDeser); + } + + private void testSerializationSchema(Map<String, String> tableOptions) { + final KvRowDataSerializationSchema expectedSer = + new KvRowDataSerializationSchema.Builder(TEST_FORMAT_SCHEMA) + .setEntryDelimiter('&') + .setKvDelimiter('=') + .setCharset(StandardCharsets.ISO_8859_1.name()) + .setEscapeCharacter('\\') + .setQuoteCharacter('\"') + .setNullLiteral("n/a") + .setIgnoreErrors(true) + .build(); + SerializationSchema<RowData> actualSer = createSerializationSchema(tableOptions); + assertEquals(expectedSer, actualSer); + } + + private DeserializationSchema<RowData> createDeserializationSchema( + Map<String, String> options) { + final DynamicTableSource actualSource = createTableSource(FactoryMocks.SCHEMA, options); + assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock; + TestDynamicTableFactory.DynamicTableSourceMock sourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + return sourceMock.valueFormat.createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE); + } + + private SerializationSchema<RowData> createSerializationSchema(Map<String, String> options) { + final DynamicTableSink actualSink = createTableSink(SCHEMA, options); + assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock; + TestDynamicTableFactory.DynamicTableSinkMock sinkMock = + (TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + return sinkMock.valueFormat.createRuntimeEncoder(null, PHYSICAL_DATA_TYPE); + } + + private Map<String, String> getAllOptions() { + final Map<String, String> options = new HashMap<>(); + options.put("connector", TestDynamicTableFactory.IDENTIFIER); + options.put("target", "MyTarget"); + options.put("buffer-size", "1000"); + + options.put("format", KvFormatFactory.IDENTIFIER); + options.put(KvFormatFactory.KV_PREFIX + TableFormatOptions.ROW_FORMAT_INFO.key(), + FormatUtils.marshall(TEST_FORMAT_SCHEMA)); + options.put(KvFormatFactory.KV_PREFIX + TableFormatConstants.FORMAT_KV_ENTRY_DELIMITER, "&"); + options.put(KvFormatFactory.KV_PREFIX + TableFormatConstants.FORMAT_KV_DELIMITER, "="); + options.put(KvFormatFactory.KV_PREFIX + TableFormatConstants.FORMAT_CHARSET, "ISO-8859-1"); + options.put(KvFormatFactory.KV_PREFIX + TableFormatConstants.FORMAT_IGNORE_ERRORS, "true"); + options.put(KvFormatFactory.KV_PREFIX + TableFormatConstants.FORMAT_ESCAPE_CHARACTER, "\\"); + options.put(KvFormatFactory.KV_PREFIX + TableFormatConstants.FORMAT_QUOTE_CHARACTER, "\""); + options.put(KvFormatFactory.KV_PREFIX + TableFormatConstants.FORMAT_NULL_LITERAL, "n/a"); + + return options; + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java new file mode 100644 index 0000000000..a0350905e3 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.base.TableFormatUtils; +import org.apache.inlong.sort.formats.common.BasicFormatInfo; +import org.apache.inlong.sort.formats.common.BooleanFormatInfo; +import org.apache.inlong.sort.formats.common.ByteFormatInfo; +import org.apache.inlong.sort.formats.common.DateFormatInfo; +import org.apache.inlong.sort.formats.common.DecimalFormatInfo; +import org.apache.inlong.sort.formats.common.DoubleFormatInfo; +import org.apache.inlong.sort.formats.common.FloatFormatInfo; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.common.ShortFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimeFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link KvRowDataDeserializationSchema}. + */ +public class KvRowDataDeserializationSchemaTest { + + private static final RowFormatInfo TEST_ROW_INFO = + new RowFormatInfo( + new String[]{"f1", "f2", "f3", "f4"}, + new FormatInfo[]{ + IntFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE + }); + + @Test + public void testNormal() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> { + }; + + testBasicDeserialization(config, StringFormatInfo.INSTANCE, StringData.fromString("hello"), "f=hello"); + testBasicDeserialization(config, BooleanFormatInfo.INSTANCE, true, "f=true"); + testBasicDeserialization(config, ByteFormatInfo.INSTANCE, (byte) 124, "f=124"); + testBasicDeserialization(config, ShortFormatInfo.INSTANCE, (short) 10000, "f=10000"); + testBasicDeserialization(config, IntFormatInfo.INSTANCE, 1234567, "f=1234567"); + testBasicDeserialization(config, LongFormatInfo.INSTANCE, 12345678910L, "f=12345678910"); + testBasicDeserialization(config, FloatFormatInfo.INSTANCE, 0.33333334f, "f=0.33333334"); + testBasicDeserialization(config, DoubleFormatInfo.INSTANCE, 0.33333333332, "f=0.33333333332"); + testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, + DecimalData.fromBigDecimal(new BigDecimal("1234.0000000000000000000000001"), 10, 0), + "f=1234.0000000000000000000000001"); + testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), + Date.valueOf("2020-03-22").toLocalDate().toEpochDay(), "f=22/03/2020"); + testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), + Time.valueOf("11:12:13").toLocalTime().toSecondOfDay() * 1000, "f=13/12/11"); + testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy hh:mm:ss"), + TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 11:12:13")), "f=22/03/2020 11:12:13"); + } + + @Test + public void testNullIteral() throws Exception { + String nullLiteral = "n/a"; + String nullField = "f=n/a"; + Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> builder.setNullLiteral(nullLiteral); + + testBasicDeserialization(config, StringFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, BooleanFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, ByteFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, ShortFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, IntFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, LongFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, FloatFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, DoubleFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, null, nullField); + testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), null, nullField); + testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), null, nullField); + testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy hh:mm:ss"), null, nullField); + } + + @Test + public void testDelimiter() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = + builder -> builder.setEntryDelimiter('|').setKvDelimiter(','); + + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, 10); + rowData.setField(1, StringData.fromString("aa")); + rowData.setField(2, StringData.fromString("bb")); + rowData.setField(3, StringData.fromString("cc")); + + testRowDeserialization( + config, + rowData, + "f1,10|f2,aa|f3,bb|f4,cc".getBytes()); + } + + @Test + public void testEscape() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = + builder -> builder.setEscapeCharacter('\\').setQuoteCharacter('\"'); + + GenericRowData rowData1 = new GenericRowData(4); + rowData1.setField(0, 10); + rowData1.setField(1, StringData.fromString("field1&field2")); + rowData1.setField(2, StringData.fromString("field3")); + rowData1.setField(3, StringData.fromString("field4")); + + testRowDeserialization( + config, + rowData1, + "f1=10&f2=field1\\&field2&f3=field3&f4=field4".getBytes()); + + GenericRowData rowData2 = new GenericRowData(4); + rowData2.setField(0, 10); + rowData2.setField(1, StringData.fromString("field1\\")); + rowData2.setField(2, StringData.fromString("field2")); + rowData2.setField(3, StringData.fromString("field3")); + + testRowDeserialization( + config, + rowData2, + "f1=10&f2=field1\\\\&f3=field2&f4=field3".getBytes()); + + GenericRowData rowData3 = new GenericRowData(4); + rowData3.setField(0, 10); + rowData3.setField(1, StringData.fromString("field1\"")); + rowData3.setField(2, StringData.fromString("field2")); + rowData3.setField(3, StringData.fromString("field3")); + + testRowDeserialization( + config, + rowData3, + "f1=10&f2=field1\\\"&f3=field2&f4=field3".getBytes()); + } + + @Test + public void testQuote() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = + builder -> builder.setEscapeCharacter('\\').setQuoteCharacter('\"'); + + GenericRowData rowData1 = new GenericRowData(4); + rowData1.setField(0, 10); + rowData1.setField(1, StringData.fromString("field1&field2")); + rowData1.setField(2, StringData.fromString("field3")); + rowData1.setField(3, StringData.fromString("field4")); + + testRowDeserialization( + config, + rowData1, + "f1=10&f2=\"field1&field2\"&f3=field3&f4=field4".getBytes()); + + GenericRowData rowData2 = new GenericRowData(4); + rowData2.setField(0, 10); + rowData2.setField(1, StringData.fromString("field1\\")); + rowData2.setField(2, StringData.fromString("field2")); + rowData2.setField(3, StringData.fromString("field3")); + + testRowDeserialization( + config, + rowData2, + "f1=10&f2=\"field1\\\"&f3=field2&f4=field3".getBytes()); + } + + @Test + public void testExtractSpecificKeys() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = + builder -> builder.setEscapeCharacter('\\').setQuoteCharacter('\"'); + + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, 10); + rowData.setField(1, StringData.fromString("field1")); + rowData.setField(2, StringData.fromString("field2")); + rowData.setField(3, StringData.fromString("field3")); + + testRowDeserialization( + config, + rowData, + "f1=10&f2=field1&f3=field2&f4=field3&f5=field4".getBytes()); + } + + @Test + public void testCharset() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = + builder -> builder.setCharset(StandardCharsets.UTF_16.name()); + + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, 10); + rowData.setField(1, StringData.fromString("aa")); + rowData.setField(2, StringData.fromString("bb")); + rowData.setField(3, StringData.fromString("cc")); + + testRowDeserialization( + config, + rowData, + "f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16)); + } + + @Test(expected = Exception.class) + public void testErrors() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> { + }; + + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, null); + rowData.setField(1, StringData.fromString("field1")); + rowData.setField(2, StringData.fromString("field2")); + rowData.setField(3, StringData.fromString("field3")); + + testRowDeserialization( + config, + rowData, + "f1=na&f2=field1&f3=field2&f4=field3".getBytes()); + } + + @Test + public void testMissingField() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> { + }; + + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, null); + rowData.setField(1, StringData.fromString("aa")); + rowData.setField(2, StringData.fromString("bb")); + rowData.setField(3, StringData.fromString("cc")); + + testRowDeserialization( + config, + rowData, + "f2=aa&f3=bb&f4=cc".getBytes()); + } + + @Test + public void testExtraField() throws Exception { + Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> { + }; + + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, 10); + rowData.setField(1, StringData.fromString("aa")); + rowData.setField(2, StringData.fromString("bb")); + rowData.setField(3, StringData.fromString("cc")); + + testRowDeserialization( + config, + rowData, + "f1=10&f2=aa&f3=bb&f4=cc&f5=dd".getBytes()); + } + + private static <T> void testBasicDeserialization( + Consumer<KvRowDataDeserializationSchema.Builder> config, + BasicFormatInfo<T> basicFormatInfo, + Object expectedRecord, + String text) throws IOException { + RowFormatInfo rowFormatInfo = + new RowFormatInfo( + new String[]{"f"}, + new FormatInfo[]{basicFormatInfo}); + LogicalType logicalType = TableFormatUtils.deriveLogicalType(basicFormatInfo); + TypeInformation<RowData> typeInformation = InternalTypeInfo.of(logicalType); + KvRowDataDeserializationSchema.Builder builder = + new KvRowDataDeserializationSchema.Builder(rowFormatInfo, typeInformation); + config.accept(builder); + + KvRowDataDeserializationSchema deserializer = builder.build(); + + GenericRowData row = (GenericRowData) deserializer.deserialize(text.getBytes()); + assertEquals(1, row.getArity()); + assertEquals(expectedRecord, row.getField(0)); + } + + private void testRowDeserialization( + Consumer<KvRowDataDeserializationSchema.Builder> config, + RowData expectedRow, + byte[] bytes) throws Exception { + LogicalType rowType = TableFormatUtils.deriveLogicalType(TEST_ROW_INFO); + TypeInformation<RowData> producedTypeInfo = InternalTypeInfo.of(rowType); + KvRowDataDeserializationSchema.Builder builder = + new KvRowDataDeserializationSchema.Builder(TEST_ROW_INFO, producedTypeInfo); + config.accept(builder); + + KvRowDataDeserializationSchema deserializer = builder.build(); + + RowData row = deserializer.deserialize(bytes); + assertEquals(expectedRow, row); + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchemaTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchemaTest.java new file mode 100644 index 0000000000..02616b61a9 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchemaTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.apache.inlong.sort.formats.common.BasicFormatInfo; +import org.apache.inlong.sort.formats.common.BooleanFormatInfo; +import org.apache.inlong.sort.formats.common.ByteFormatInfo; +import org.apache.inlong.sort.formats.common.DateFormatInfo; +import org.apache.inlong.sort.formats.common.DecimalFormatInfo; +import org.apache.inlong.sort.formats.common.DoubleFormatInfo; +import org.apache.inlong.sort.formats.common.FloatFormatInfo; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.common.ShortFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimeFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.junit.Test; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.function.Consumer; + +import static org.apache.flink.table.data.DecimalData.fromBigDecimal; +import static org.apache.flink.table.data.StringData.fromString; +import static org.apache.flink.table.data.TimestampData.fromTimestamp; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link KvRowDataSerializationSchema}. + */ +public class KvRowDataSerializationSchemaTest { + + private static final RowFormatInfo TEST_ROW_INFO = + new RowFormatInfo( + new String[]{"f1", "f2", "f3", "f4"}, + new FormatInfo[]{ + IntFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE + }); + + @Test + public void testNormal() { + Consumer<KvRowDataSerializationSchema.Builder> config = builder -> { + }; + + testBasicSerialization(config, StringFormatInfo.INSTANCE, "hello", "f=hello"); + testBasicSerialization(config, BooleanFormatInfo.INSTANCE, true, "f=true"); + testBasicSerialization(config, ByteFormatInfo.INSTANCE, (byte) 124, "f=124"); + testBasicSerialization(config, ShortFormatInfo.INSTANCE, (short) 10000, "f=10000"); + testBasicSerialization(config, IntFormatInfo.INSTANCE, 1234567, "f=1234567"); + testBasicSerialization(config, LongFormatInfo.INSTANCE, 12345678910L, "f=12345678910"); + testBasicSerialization(config, FloatFormatInfo.INSTANCE, 0.33333334f, "f=0.33333334"); + testBasicSerialization(config, DoubleFormatInfo.INSTANCE, 0.33333333332, "f=0.33333333332"); + testBasicSerialization(config, DecimalFormatInfo.INSTANCE, new BigDecimal("1234.0000000000000000000000001"), + "f=1234.0000000000000000000000001"); + testBasicSerialization(config, new DateFormatInfo("dd/MM/yyyy"), Date.valueOf("2020-03-22"), "f=22/03/2020"); + testBasicSerialization(config, new TimeFormatInfo("ss/mm/hh"), Time.valueOf("11:12:13"), "f=13/12/11"); + testBasicSerialization(config, new TimestampFormatInfo("dd/MM/yyyy hh:mm:ss"), + Timestamp.valueOf("2020-03-22 11:12:13"), "f=22/03/2020 11:12:13"); + } + + @Test + public void testNullIteral() { + String nullLiteral = "n/a"; + String nullField = "f=n/a"; + Consumer<KvRowDataSerializationSchema.Builder> config = builder -> builder.setNullLiteral(nullLiteral); + + testBasicSerialization(config, StringFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, BooleanFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, ByteFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, ShortFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, IntFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, LongFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, FloatFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, DoubleFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, DecimalFormatInfo.INSTANCE, null, nullField); + testBasicSerialization(config, new DateFormatInfo("dd/MM/yyyy"), null, nullField); + testBasicSerialization(config, new TimeFormatInfo("ss/mm/hh"), null, nullField); + testBasicSerialization(config, new TimestampFormatInfo("dd/MM/yyyy hh:mm:ss"), null, nullField); + } + + @Test + public void testDelimiter() { + Consumer<KvRowDataSerializationSchema.Builder> config = + builder -> builder.setEntryDelimiter('|').setKvDelimiter(','); + + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1"), + fromString("field2"), + fromString("field3")), + "f1,10|f2,field1|f3,field2|f4,field3".getBytes()); + + } + + @Test + public void testEscape() { + Consumer<KvRowDataSerializationSchema.Builder> config = + builder -> builder.setEscapeCharacter('\\').setQuoteCharacter('\"'); + + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1&field2"), + fromString("field3"), + fromString("field4")), + "f1=10&f2=field1\\&field2&f3=field3&f4=field4".getBytes()); + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1\\"), + fromString("field2"), + fromString("field3")), + "f1=10&f2=field1\\\\&f3=field2&f4=field3".getBytes()); + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1\""), + fromString("field2"), + fromString("field3")), + "f1=10&f2=field1\\\"&f3=field2&f4=field3".getBytes()); + } + + @Test + public void testQuote() { + Consumer<KvRowDataSerializationSchema.Builder> config = builder -> builder.setQuoteCharacter('\"'); + + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1&field2"), + fromString("field3"), + fromString("field4")), + "f1=10&f2=field1\"&\"field2&f3=field3&f4=field4".getBytes()); + } + + @Test + public void testCharset() { + Consumer<KvRowDataSerializationSchema.Builder> config = + builder -> builder.setCharset(StandardCharsets.UTF_16.name()); + + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1"), + fromString("field2"), + fromString("field3")), + "f1=10&f2=field1&f3=field2&f4=field3".getBytes(StandardCharsets.UTF_16)); + } + + @Test + public void testNullFields() { + Consumer<KvRowDataSerializationSchema.Builder> config = builder -> { + }; + + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1"), + null, + fromString("field3")), + "f1=10&f2=field1&f3=&f4=field3".getBytes()); + } + + @Test + public void testMoreFields() { + Consumer<KvRowDataSerializationSchema.Builder> config = builder -> { + }; + + testRowSerialization( + config, + GenericRowData.of(10, + fromString("field1"), + fromString("field2"), + fromString("field3"), + fromString("field4")), + "f1=10&f2=field1&f3=field2&f4=field3".getBytes()); + } + + @Test + public void testLessFields() { + Consumer<KvRowDataSerializationSchema.Builder> config = builder -> { + }; + + testRowSerialization( + config, + GenericRowData.of( + 10, + fromString("field1"), + fromString("field2")), + "f1=10&f2=field1&f3=field2&f4=".getBytes()); + } + + @Test(expected = RuntimeException.class) + public void testErrors() { + Consumer<KvRowDataSerializationSchema.Builder> config = builder -> { + }; + testRowSerialization( + config, + GenericRowData.of( + fromString("na"), + fromString("field1"), + fromString("field2"), + fromString("field3")), + "f1=&f2=field1&f3=field2&f4=field3".getBytes()); + } + + private static <T> void testBasicSerialization( + Consumer<KvRowDataSerializationSchema.Builder> config, + BasicFormatInfo<T> basicFormatInfo, + T record, + String expectedText) { + RowFormatInfo rowFormatInfo = + new RowFormatInfo( + new String[]{"f"}, + new FormatInfo[]{basicFormatInfo}); + + KvRowDataSerializationSchema.Builder builder = + new KvRowDataSerializationSchema.Builder(rowFormatInfo); + config.accept(builder); + + KvRowDataSerializationSchema serializer = builder.build(); + + GenericRowData rowData = new GenericRowData(1); + if (record instanceof String) { + rowData.setField(0, fromString((String) record)); + } else if (record instanceof BigDecimal) { + rowData.setField(0, fromBigDecimal((BigDecimal) record, 30, 25)); + } else if (record instanceof Timestamp) { + rowData.setField(0, fromTimestamp((Timestamp) record)); + } else if (record instanceof Date) { + rowData.setField(0, ((Date) record).toLocalDate().toEpochDay()); + } else if (record instanceof Time) { + rowData.setField(0, ((Time) record).toLocalTime().toSecondOfDay() * 1000); + } else { + rowData.setField(0, record); + } + + String text = new String(serializer.serialize(rowData)); + assertEquals(expectedText, text); + } + + private static void testRowSerialization( + Consumer<KvRowDataSerializationSchema.Builder> config, + RowData row, + byte[] expectedBytes) { + KvRowDataSerializationSchema.Builder builder = + new KvRowDataSerializationSchema.Builder(TEST_ROW_INFO); + config.accept(builder); + + KvRowDataSerializationSchema serializer = builder.build(); + byte[] bytes = serializer.serialize(row); + assertArrayEquals(expectedBytes, bytes); + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java new file mode 100644 index 0000000000..4ad02f88e8 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.kv; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.sort.formats.util.StringUtils.concatKv; +import static org.apache.inlong.sort.formats.util.StringUtils.splitKv; +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for kv splitting and concating. + */ +public class KvUtilsTest { + + @Test + public void testSplitNormal() { + List<Map<String, String>> list = + splitKv("f1=a\nf1=b", '&', '=', '\\', '\"', '\n'); + List<Map<String, String>> expectedList = new ArrayList<>(); + expectedList.add(new HashMap<String, String>() { + + { + put("f1", "a"); + } + }); + expectedList.add(new HashMap<String, String>() { + + { + put("f1", "b"); + } + }); + assertEquals(list, expectedList); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "a"); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("f1=a&f2=b&f3=c", '&', '=', null, null)); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", ""); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("f1=&f2=b&f3=c", '&', '=', null, null)); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "a"); + put("f2", "b"); + put("f3", ""); + } + }, + splitKv("f1=a&f2=b&f3=", '&', '=', null, null)); + + assertEquals( + new HashMap<String, String>() { + + { + put("=f1", "a"); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("\\=f1=a&f2=b&f3=c", '&', '=', '\\', null)); + + assertEquals( + new HashMap<String, String>() { + + { + put("&f1", "a"); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("\\&f1=a&f2=b&f3=c", '&', '=', '\\', null)); + + assertEquals( + new HashMap<String, String>() { + + { + put("&f1", "a"); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("\"&f1\"=a&f2=b&f3=c", '&', '=', '\\', '\"')); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "a&"); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("f1=a\\&&f2=b&f3=c", '&', '=', '\\', null)); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "a\\"); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("f1=a\\\\&f2=b&f3=c", '&', '=', '\\', null)); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "a&f2=b"); + put("f3", "c"); + put("f4", "d"); + } + }, + splitKv("f1=a\"&f2=\"b&f3=c&f4=d", '&', '=', '\\', '\"')); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "atest\\test"); + put("f2", "b"); + put("f3", "c"); + } + }, + splitKv("f1=a\"test\\test\"&f2=b&f3=c", '&', '=', '\\', '\"')); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "a"); + put("f2", "\"b"); + put("f3", "c\""); + put("f4", "d"); + } + }, + splitKv("f1=a&f2=\\\"b&f3=c\\\"&f4=d", '&', '=', '\\', '\"')); + + assertEquals( + new HashMap<String, String>() { + + { + put("f1", "b"); + } + }, + splitKv("f1=a&f1=b", '&', '=', '\\', '\"')); + + assertEquals( + new HashMap<String, String>() { + + { + put("", "a"); + put("f", ""); + } + }, + splitKv("=a&f=", '&', '=', '\\', '\"')); + } + + @Test(expected = RuntimeException.class) + public void testSplitNestedValue() { + splitKv("f1=a=a&f2=b&f3=c", '&', '=', '\\', '\"'); + } + + @Test(expected = RuntimeException.class) + public void testSplitUnclosedEscaping() { + splitKv("f1=a&f2=b\\", '&', '=', '\\', '\"'); + } + + @Test(expected = RuntimeException.class) + public void testSplitUnclosedQuoting() { + splitKv("f1=a&f2=b\"", '&', '=', '\\', '\"'); + } + + @Test(expected = RuntimeException.class) + public void testSplitDanglingKey1() { + splitKv("f1", '&', '=', null, null); + } + + @Test(expected = RuntimeException.class) + public void testSplitDanglingKey2() { + splitKv("f1&f2=3", '&', '=', null, null); + } + + @Test + public void testConcatNormal() { + assertEquals( + "f1=a&f2=b&f3=c&f4=d", + concatKv( + new String[]{"f1", "f2", "f3", "f4"}, + new String[]{"a", "b", "c", "d"}, + '&', '=', null, null)); + + assertEquals( + "f1\\&=a&f2=\\&b&f3=c&f4=d", + concatKv( + new String[]{"f1&", "f2", "f3", "f4"}, + new String[]{"a", "&b", "c", "d"}, + '&', '=', '\\', '\"')); + + assertEquals( + "f1=a&f2=\\\\b&f3=c&f4=d", + concatKv( + new String[]{"f1", "f2", "f3", "f4"}, + new String[]{"a", "\\b", "c", "d"}, + '&', '=', '\\', '\"')); + + assertEquals( + "f1=a&f2=\\\"b&f3=c&f4=d", + concatKv( + new String[]{"f1", "f2", "f3", "f4"}, + new String[]{"a", "\"b", "c", "d"}, + '&', '=', '\\', '\"')); + + assertEquals( + "f1\"&\"=a&f2=\"&\"b&f3=c&f4=d", + concatKv( + new String[]{"f1&", "f2", "f3", "f4"}, + new String[]{"a", "&b", "c", "d"}, + '&', '=', null, '\"')); + } + + @Test(expected = RuntimeException.class) + public void testConcatNoEscapingAndQuoting() { + concatKv( + new String[]{"f1", "f2", "f3", "f4"}, + new String[]{"&a", "&b", "&c", "&d"}, + '&', '=', null, null); + } + + @Test(expected = RuntimeException.class) + public void testConcatNoEscaping() { + concatKv( + new String[]{"f1", "f2", "f3", "f4"}, + new String[]{"a", "\"b", "c", "d"}, + '&', '=', null, '\"'); + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000..881dc0609b --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml b/inlong-sort/sort-formats/format-rowdata/pom.xml index 0334976986..83ee0e56d4 100644 --- a/inlong-sort/sort-formats/format-rowdata/pom.xml +++ b/inlong-sort/sort-formats/format-rowdata/pom.xml @@ -33,6 +33,8 @@ <modules> <module>format-rowdata-base</module> + <module>format-rowdata-csv</module> + <module>format-rowdata-kv</module> <module>format-rowdata-json</module> <module>format-inlongmsg-rowdata-base</module> <module>format-inlongmsg-rowdata-binlog</module>