This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c244ec7561 [INLONG-10323][Sort] Support KV deserialization format in 
sort module (#10349)
c244ec7561 is described below

commit c244ec75616591ab956e4610ecbc0cf73f6ee394
Author: ZiruiPeng <zpen...@connect.ust.hk>
AuthorDate: Thu Jun 6 19:03:39 2024 +0800

    [INLONG-10323][Sort] Support KV deserialization format in sort module 
(#10349)
---
 .../inlong/sort/protocol/node/format/Format.java   |   3 +-
 .../inlong/sort/protocol/node/format/KvFormat.java | 131 +++++++++++++++++++++
 .../sort/protocol/node/format/KvFormatTest.java    |  28 +++++
 inlong-sort/sort-dist/pom.xml                      |   2 +-
 .../inlong/sort/formats/base/TableFormatUtils.java |  16 +++
 .../inlong/sort/formats/kv/KvFormatFactory.java    |   6 +-
 6 files changed, 181 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
index 82196600c1..69182d1df2 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
@@ -35,7 +35,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = CanalJsonFormat.class, name = 
"canalJsonFormat"),
         @JsonSubTypes.Type(value = CsvFormat.class, name = "csvFormat"),
         @JsonSubTypes.Type(value = InLongMsgFormat.class, name = 
"inLongMsgFormat"),
-        @JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat")
+        @JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat"),
+        @JsonSubTypes.Type(value = KvFormat.class, name = "kvFormat")
 })
 public interface Format extends Serializable {
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/KvFormat.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/KvFormat.java
new file mode 100644
index 0000000000..742d2423e4
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/KvFormat.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_CHARSET;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
+
+@JsonTypeName("kvFormat")
+@Data
+public class KvFormat implements Format {
+
+    private static final String IDENTIFIER = "inlong-kv";
+
+    @JsonProperty(value = "entryDelimiter", defaultValue = "&")
+    private final String entryDelimiter;
+
+    @JsonProperty(value = "kvDelimiter", defaultValue = "=")
+    private final String kvDelimiter;
+
+    @JsonProperty(value = "ignoreParseErrors", defaultValue = "false")
+    @Nullable
+    private final Boolean ignoreParseErrors;
+
+    @JsonProperty(value = "escapeChar")
+    @Nullable
+    private final String escapeChar;
+
+    @JsonProperty(value = "charset")
+    @Nullable
+    private final String charset;
+
+    @JsonProperty(value = "nullLiteral")
+    @Nullable
+    private final String nullLiteral;
+
+    @JsonProperty(value = "quoteCharacter")
+    @Nullable
+    private final String quoteCharacter;
+
+    @JsonCreator
+    public KvFormat(@JsonProperty(value = "entryDelimiter") String 
entryDelimiter,
+            @JsonProperty(value = "kvDelimiter") String kvDelimiter,
+            @Nullable @JsonProperty(value = "escapeChar") String escapeChar,
+            @Nullable @JsonProperty(value = "ignoreParseErrors", defaultValue 
= "false") Boolean ignoreParseErrors,
+            @Nullable @JsonProperty(value = "charset") String charset,
+            @Nullable @JsonProperty(value = "nullLiteral") String nullLiteral,
+            @Nullable @JsonProperty(value = "quoteCharacter") String 
quoteCharacter) {
+        this.entryDelimiter = entryDelimiter;
+        this.kvDelimiter = kvDelimiter;
+        this.escapeChar = escapeChar;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.charset = charset;
+        this.nullLiteral = nullLiteral;
+        this.quoteCharacter = quoteCharacter;
+    }
+
+    public KvFormat() {
+        this("&", "=", null, false, null, null, null);
+    }
+
+    @Override
+    @JsonIgnore
+    public String getFormat() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Map<String, String> generateOptions() {
+        Map<String, String> options = new HashMap<>(16);
+
+        options.put("format", getFormat());
+        options.put(FORMAT_KV_DELIMITER, this.kvDelimiter);
+        options.put(FORMAT_KV_ENTRY_DELIMITER, this.entryDelimiter);
+
+        if (StringUtils.isNotBlank(charset)) {
+            options.put(FORMAT_CHARSET, this.charset);
+        }
+        if (StringUtils.isNotBlank(nullLiteral)) {
+            options.put(FORMAT_NULL_LITERAL, this.nullLiteral);
+        }
+        if (StringUtils.isNotBlank(quoteCharacter)) {
+            options.put(FORMAT_QUOTE_CHARACTER, this.quoteCharacter);
+        }
+        if (StringUtils.isNotBlank(escapeChar)) {
+            options.put(FORMAT_ESCAPE_CHARACTER, this.escapeChar);
+        }
+        if (ignoreParseErrors != null) {
+            options.put(FORMAT_IGNORE_ERRORS, 
String.valueOf(this.ignoreParseErrors));
+        }
+
+        return options;
+    }
+
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/KvFormatTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/KvFormatTest.java
new file mode 100644
index 0000000000..fbb6407678
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/KvFormatTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+
+public class KvFormatTest extends SerializeBaseTest<KvFormat> {
+
+    @Override
+    public KvFormat getTestObject() {
+        return new KvFormat();
+    }
+}
diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml
index 9bae28294c..9314c7f8d1 100644
--- a/inlong-sort/sort-dist/pom.xml
+++ b/inlong-sort/sort-dist/pom.xml
@@ -82,7 +82,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-kv</artifactId>
+            <artifactId>sort-format-rowdata-kv</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index 5cac263392..f68a0085f0 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -612,6 +612,22 @@ public class TableFormatUtils {
         return new RowFormatInfo(rowType.getFieldNames().toArray(new 
String[1]), fieldFormatInfos);
     }
 
+    public static RowFormatInfo deriveRowFormatInfo(DataType dataType) {
+
+        RowType rowType = (RowType) dataType.getLogicalType();
+        int size = rowType.getFields().size();
+        FormatInfo[] fieldFormatInfos = new FormatInfo[size];
+        String[] fieldNames = new String[size];
+
+        for (int i = 0; i < size; i++) {
+            LogicalType fieldType = rowType.getTypeAt(i);
+            fieldFormatInfos[i] = deriveFormatInfo(fieldType);
+            fieldNames[i] = rowType.getFieldNames().get(i);
+        }
+
+        return new RowFormatInfo(fieldNames, fieldFormatInfos);
+    }
+
     public static RowFormatInfo deserializeRowFormatInfo(String 
rowFormatInfoStr) {
         try {
             FormatInfo formatInfo = FormatUtils.demarshall(rowFormatInfoStr);
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java
index b9ca0d9599..773ad7e01d 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.kv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
 import org.apache.inlong.sort.formats.base.TableFormatOptions;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.base.TextFormatOptions;
 
 import org.apache.commons.lang3.StringEscapeUtils;
@@ -104,10 +105,9 @@ public class KvFormatFactory
             public DeserializationSchema<RowData> createRuntimeDecoder(
                     DynamicTableSource.Context context,
                     DataType dataType) {
-
                 KvRowDataDeserializationSchema.Builder schemaBuilder =
                         new KvRowDataDeserializationSchema.Builder(
-                                
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)),
+                                TableFormatUtils.deriveRowFormatInfo(dataType),
                                 context.createTypeInformation(dataType));
                 configureDeserializationSchema(formatOptions, schemaBuilder);
                 return schemaBuilder.build();
@@ -128,7 +128,6 @@ public class KvFormatFactory
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(ROW_FORMAT_INFO);
         options.add(TextFormatOptions.KV_ENTRY_DELIMITER);
         options.add(TextFormatOptions.KV_DELIMITER);
         options.add(TextFormatOptions.CHARSET);
@@ -138,6 +137,7 @@ public class KvFormatFactory
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(ROW_FORMAT_INFO);
         options.add(TextFormatOptions.ESCAPE_CHARACTER);
         options.add(TextFormatOptions.QUOTE_CHARACTER);
         options.add(TextFormatOptions.NULL_LITERAL);

Reply via email to