This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c244ec7561 [INLONG-10323][Sort] Support KV deserialization format in sort module (#10349) c244ec7561 is described below commit c244ec75616591ab956e4610ecbc0cf73f6ee394 Author: ZiruiPeng <zpen...@connect.ust.hk> AuthorDate: Thu Jun 6 19:03:39 2024 +0800 [INLONG-10323][Sort] Support KV deserialization format in sort module (#10349) --- .../inlong/sort/protocol/node/format/Format.java | 3 +- .../inlong/sort/protocol/node/format/KvFormat.java | 131 +++++++++++++++++++++ .../sort/protocol/node/format/KvFormatTest.java | 28 +++++ inlong-sort/sort-dist/pom.xml | 2 +- .../inlong/sort/formats/base/TableFormatUtils.java | 16 +++ .../inlong/sort/formats/kv/KvFormatFactory.java | 6 +- 6 files changed, 181 insertions(+), 5 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java index 82196600c1..69182d1df2 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java @@ -35,7 +35,8 @@ import java.util.Map; @JsonSubTypes.Type(value = CanalJsonFormat.class, name = "canalJsonFormat"), @JsonSubTypes.Type(value = CsvFormat.class, name = "csvFormat"), @JsonSubTypes.Type(value = InLongMsgFormat.class, name = "inLongMsgFormat"), - @JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat") + @JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat"), + @JsonSubTypes.Type(value = KvFormat.class, name = "kvFormat") }) public interface Format extends Serializable { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/KvFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/KvFormat.java new file mode 100644 index 0000000000..742d2423e4 --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/KvFormat.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.format; + +import lombok.Data; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_CHARSET; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER; + +@JsonTypeName("kvFormat") +@Data +public class KvFormat implements Format { + + private static final String IDENTIFIER = "inlong-kv"; + + @JsonProperty(value = "entryDelimiter", defaultValue = "&") + private final String entryDelimiter; + + @JsonProperty(value = "kvDelimiter", defaultValue = "=") + private final String kvDelimiter; + + @JsonProperty(value = "ignoreParseErrors", defaultValue = "false") + @Nullable + private final Boolean ignoreParseErrors; + + @JsonProperty(value = "escapeChar") + @Nullable + private final String escapeChar; + + @JsonProperty(value = "charset") + @Nullable + private final String charset; + + @JsonProperty(value = "nullLiteral") + @Nullable + private final String nullLiteral; + + @JsonProperty(value = "quoteCharacter") + @Nullable + private final String quoteCharacter; + + @JsonCreator + public KvFormat(@JsonProperty(value = "entryDelimiter") String entryDelimiter, + @JsonProperty(value = "kvDelimiter") String kvDelimiter, + @Nullable @JsonProperty(value = "escapeChar") String escapeChar, + @Nullable @JsonProperty(value = "ignoreParseErrors", defaultValue = "false") Boolean ignoreParseErrors, + @Nullable @JsonProperty(value = "charset") String charset, + @Nullable @JsonProperty(value = "nullLiteral") String nullLiteral, + @Nullable @JsonProperty(value = "quoteCharacter") String quoteCharacter) { + this.entryDelimiter = entryDelimiter; + this.kvDelimiter = kvDelimiter; + this.escapeChar = escapeChar; + this.ignoreParseErrors = ignoreParseErrors; + this.charset = charset; + this.nullLiteral = nullLiteral; + this.quoteCharacter = quoteCharacter; + } + + public KvFormat() { + this("&", "=", null, false, null, null, null); + } + + @Override + @JsonIgnore + public String getFormat() { + return IDENTIFIER; + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Map<String, String> generateOptions() { + Map<String, String> options = new HashMap<>(16); + + options.put("format", getFormat()); + options.put(FORMAT_KV_DELIMITER, this.kvDelimiter); + options.put(FORMAT_KV_ENTRY_DELIMITER, this.entryDelimiter); + + if (StringUtils.isNotBlank(charset)) { + options.put(FORMAT_CHARSET, this.charset); + } + if (StringUtils.isNotBlank(nullLiteral)) { + options.put(FORMAT_NULL_LITERAL, this.nullLiteral); + } + if (StringUtils.isNotBlank(quoteCharacter)) { + options.put(FORMAT_QUOTE_CHARACTER, this.quoteCharacter); + } + if (StringUtils.isNotBlank(escapeChar)) { + options.put(FORMAT_ESCAPE_CHARACTER, this.escapeChar); + } + if (ignoreParseErrors != null) { + options.put(FORMAT_IGNORE_ERRORS, String.valueOf(this.ignoreParseErrors)); + } + + return options; + } + +} diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/KvFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/KvFormatTest.java new file mode 100644 index 0000000000..fbb6407678 --- /dev/null +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/KvFormatTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.format; + +import org.apache.inlong.sort.SerializeBaseTest; + +public class KvFormatTest extends SerializeBaseTest<KvFormat> { + + @Override + public KvFormat getTestObject() { + return new KvFormat(); + } +} diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml index 9bae28294c..9314c7f8d1 100644 --- a/inlong-sort/sort-dist/pom.xml +++ b/inlong-sort/sort-dist/pom.xml @@ -82,7 +82,7 @@ </dependency> <dependency> <groupId>org.apache.inlong</groupId> - <artifactId>sort-format-kv</artifactId> + <artifactId>sort-format-rowdata-kv</artifactId> <version>${project.version}</version> </dependency> </dependencies> diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java index 5cac263392..f68a0085f0 100644 --- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java +++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java @@ -612,6 +612,22 @@ public class TableFormatUtils { return new RowFormatInfo(rowType.getFieldNames().toArray(new String[1]), fieldFormatInfos); } + public static RowFormatInfo deriveRowFormatInfo(DataType dataType) { + + RowType rowType = (RowType) dataType.getLogicalType(); + int size = rowType.getFields().size(); + FormatInfo[] fieldFormatInfos = new FormatInfo[size]; + String[] fieldNames = new String[size]; + + for (int i = 0; i < size; i++) { + LogicalType fieldType = rowType.getTypeAt(i); + fieldFormatInfos[i] = deriveFormatInfo(fieldType); + fieldNames[i] = rowType.getFieldNames().get(i); + } + + return new RowFormatInfo(fieldNames, fieldFormatInfos); + } + public static RowFormatInfo deserializeRowFormatInfo(String rowFormatInfoStr) { try { FormatInfo formatInfo = FormatUtils.demarshall(rowFormatInfoStr); diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java index b9ca0d9599..773ad7e01d 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.kv; import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils; import org.apache.inlong.sort.formats.base.TableFormatOptions; +import org.apache.inlong.sort.formats.base.TableFormatUtils; import org.apache.inlong.sort.formats.base.TextFormatOptions; import org.apache.commons.lang3.StringEscapeUtils; @@ -104,10 +105,9 @@ public class KvFormatFactory public DeserializationSchema<RowData> createRuntimeDecoder( DynamicTableSource.Context context, DataType dataType) { - KvRowDataDeserializationSchema.Builder schemaBuilder = new KvRowDataDeserializationSchema.Builder( - deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)), + TableFormatUtils.deriveRowFormatInfo(dataType), context.createTypeInformation(dataType)); configureDeserializationSchema(formatOptions, schemaBuilder); return schemaBuilder.build(); @@ -128,7 +128,6 @@ public class KvFormatFactory @Override public Set<ConfigOption<?>> requiredOptions() { Set<ConfigOption<?>> options = new HashSet<>(); - options.add(ROW_FORMAT_INFO); options.add(TextFormatOptions.KV_ENTRY_DELIMITER); options.add(TextFormatOptions.KV_DELIMITER); options.add(TextFormatOptions.CHARSET); @@ -138,6 +137,7 @@ public class KvFormatFactory @Override public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); + options.add(ROW_FORMAT_INFO); options.add(TextFormatOptions.ESCAPE_CHARACTER); options.add(TextFormatOptions.QUOTE_CHARACTER); options.add(TextFormatOptions.NULL_LITERAL);