This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e9cbc3671f [INLONG-10370][Manager] Support configuration of kV data 
format (#10371)
e9cbc3671f is described below

commit e9cbc3671fc3ecad4b6ca70e628015d4fbb1e03b
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Fri Jun 7 10:08:23 2024 +0800

    [INLONG-10370][Manager] Support configuration of kV data format (#10371)
---
 .../org/apache/inlong/common/enums/DataTypeEnum.java |  1 +
 .../org/apache/inlong/common/util/StringUtil.java    |  9 +++++++++
 .../pojo/sort/node/base/ExtractNodeProvider.java     | 20 ++++++++++++++------
 .../pojo/sort/node/provider/KafkaProvider.java       |  2 ++
 .../pojo/sort/node/provider/PulsarProvider.java      |  2 ++
 .../pojo/sort/node/provider/TubeMqProvider.java      |  2 ++
 .../manager/pojo/source/kafka/KafkaSource.java       |  3 +++
 .../manager/pojo/source/kafka/KafkaSourceDTO.java    |  8 ++++++--
 .../pojo/source/kafka/KafkaSourceRequest.java        |  9 +++++----
 .../manager/pojo/source/pulsar/PulsarSource.java     |  3 +++
 .../manager/pojo/source/pulsar/PulsarSourceDTO.java  |  8 ++++++--
 .../pojo/source/pulsar/PulsarSourceRequest.java      |  9 +++++----
 .../manager/pojo/source/tubemq/TubeMQSource.java     |  9 +++++++++
 .../manager/pojo/source/tubemq/TubeMQSourceDTO.java  | 12 +++++++++++-
 .../pojo/source/tubemq/TubeMQSourceRequest.java      |  9 +++++++++
 15 files changed, 87 insertions(+), 19 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 385c5ed71d..e0dc5c1251 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -25,6 +25,7 @@ import java.util.Locale;
 public enum DataTypeEnum {
 
     CSV("csv"),
+    KV("kv"),
     AVRO("avro"),
     JSON("json"),
     CANAL("canal"),
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java 
b/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java
index cddef9dc8f..7df1eb6975 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.common.util;
 
+import org.apache.commons.lang3.StringUtils;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -274,4 +276,11 @@ public class StringUtil {
         }
     }
 
+    public static String parseChar(String charStr) {
+        if (StringUtils.isNumeric(charStr)) {
+            char numberChar = (char) Integer.parseInt(charStr);
+            charStr = Character.toString(numberChar);
+        }
+        return charStr;
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index 81b9763d79..52c09062d7 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.base;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.enums.MessageWrapType;
+import org.apache.inlong.common.util.StringUtil;
 import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -32,10 +33,10 @@ import 
org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.KvFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 import java.util.Objects;
@@ -90,8 +91,10 @@ public interface ExtractNodeProvider extends NodeProvider {
      * Parse format
      *
      * @param serializationType data serialization, support: csv, json, canal, 
avro, etc
-     * @param wrapWithInlongMsg whether wrap content with {@link 
InLongMsgFormat}
+     * @param wrapType whether wrap content with {@link InLongMsgFormat}
      * @param separatorStr the separator of data content
+     * @param kvSeparatorStr the kv separator
+     * @param escapeCharStr the escape char
      * @param ignoreParseErrors whether ignore deserialization error data
      * @return the format for serialized content
      */
@@ -99,15 +102,14 @@ public interface ExtractNodeProvider extends NodeProvider {
             String serializationType,
             String wrapType,
             String separatorStr,
+            String kvSeparatorStr,
+            String escapeCharStr,
             Boolean ignoreParseErrors) {
         Format format;
         DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
         switch (dataType) {
             case CSV:
-                if (StringUtils.isNumeric(separatorStr)) {
-                    char dataSeparator = (char) Integer.parseInt(separatorStr);
-                    separatorStr = Character.toString(dataSeparator);
-                }
+                separatorStr = StringUtil.parseChar(separatorStr);
                 CsvFormat csvFormat = new CsvFormat(separatorStr);
                 csvFormat.setIgnoreParseErrors(ignoreParseErrors);
                 format = csvFormat;
@@ -131,6 +133,12 @@ public interface ExtractNodeProvider extends NodeProvider {
             case RAW:
                 format = new RawFormat();
                 break;
+            case KV:
+                separatorStr = StringUtil.parseChar(separatorStr);
+                kvSeparatorStr = StringUtil.parseChar(kvSeparatorStr);
+                escapeCharStr = StringUtil.parseChar(escapeCharStr);
+                format = new KvFormat(separatorStr, kvSeparatorStr, 
escapeCharStr, ignoreParseErrors, null, null, null);
+                break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
dataType=%s", dataType));
         }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index 9e602c0293..e7ca76241e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -68,6 +68,8 @@ public class KafkaProvider implements ExtractNodeProvider, 
LoadNodeProvider {
                 kafkaSource.getSerializationType(),
                 kafkaSource.getWrapType(),
                 kafkaSource.getDataSeparator(),
+                kafkaSource.getKvSeparator(),
+                kafkaSource.getDataEscapeChar(),
                 kafkaSource.getIgnoreParseError());
 
         KafkaScanStartupMode startupMode = 
parseStartupMode(kafkaSource.getAutoOffsetReset());
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 1d8ace1480..9493f78f41 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -59,6 +59,8 @@ public class PulsarProvider implements ExtractNodeProvider {
         Format format = parsingFormat(pulsarSource.getSerializationType(),
                 pulsarSource.getWrapType(),
                 pulsarSource.getDataSeparator(),
+                pulsarSource.getKvSeparator(),
+                pulsarSource.getDataEscapeChar(),
                 pulsarSource.getIgnoreParseError());
 
         PulsarScanStartupMode startupMode = 
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index 6b80d4735e..d2553a76ab 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -55,6 +55,8 @@ public class TubeMqProvider implements ExtractNodeProvider {
                 source.getSerializationType(),
                 source.getWrapType(),
                 source.getDataSeparator(),
+                source.getKvSeparator(),
+                source.getDataEscapeChar(),
                 source.getIgnoreParseError());
         Map<String, String> properties = 
parseProperties(source.getProperties());
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
index 061e9176af..5c8afdbf5c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
@@ -90,6 +90,9 @@ public class KafkaSource extends StreamSource {
     @ApiModelProperty(value = "Data separator")
     private String dataSeparator;
 
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
+
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
index 082b1adf7c..730f63f7b3 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import javax.validation.constraints.NotNull;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 /**
@@ -91,10 +92,13 @@ public class KafkaSourceDTO {
     private String primaryKey;
 
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
-    private String dataEncoding;
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator;
+    private String dataSeparator = String.valueOf((int) '|');
+
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
index e9343253df..89da06d277 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
@@ -27,8 +27,6 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
-import java.nio.charset.StandardCharsets;
-
 /**
  * Kafka source request
  */
@@ -80,10 +78,13 @@ public class KafkaSourceRequest extends SourceRequest {
     private String primaryKey;
 
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
-    private String dataEncoding = StandardCharsets.UTF_8.toString();
+    private String dataEncoding;
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator = String.valueOf((int) '|');
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 884100c988..a000e58147 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -73,6 +73,9 @@ public class PulsarSource extends StreamSource {
     @ApiModelProperty(value = "Data separator")
     private String dataSeparator;
 
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
+
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 2f9c9b1ab1..5fb40984bc 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import javax.validation.constraints.NotNull;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 /**
@@ -64,10 +65,13 @@ public class PulsarSourceDTO {
     private String primaryKey;
 
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
-    private String dataEncoding;
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator;
+    private String dataSeparator = String.valueOf((int) '|');
+
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index 0c6946bc6e..6e7be5125b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -27,8 +27,6 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
-import java.nio.charset.StandardCharsets;
-
 /**
  * Pulsar source request
  */
@@ -61,10 +59,13 @@ public class PulsarSourceRequest extends SourceRequest {
     private String primaryKey;
 
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
-    private String dataEncoding = StandardCharsets.UTF_8.toString();
+    private String dataEncoding;
 
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator = String.valueOf((int) '|');
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
 
     @ApiModelProperty(value = "Data field escape symbol")
     private String dataEscapeChar;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
index a2539b4c98..786aef0259 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
@@ -59,9 +59,18 @@ public class TubeMQSource extends StreamSource {
     @ApiModelProperty("Session key of the TubeMQ")
     private String sessionKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
     @ApiModelProperty(value = "Data separator")
     private String dataSeparator;
 
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol")
+    private String dataEscapeChar;
+
     /**
      * The TubeMQ consumers use this streamId set to filter records reading 
from server.
      */
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
index bae3b951bd..dbae1cbb0b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import javax.validation.constraints.NotNull;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.TreeSet;
 
@@ -55,8 +56,17 @@ public class TubeMQSourceDTO {
     @ApiModelProperty("Session key of the TubeMQ")
     private String sessionKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
+
     @ApiModelProperty(value = "Data separator")
-    private String dataSeparator;
+    private String dataSeparator = String.valueOf((int) '|');
+
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol")
+    private String dataEscapeChar;
 
     @ApiModelProperty(value = "The message body wrap  wrap type, including: 
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
index 7ef7ca4c34..85ea5c10da 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
@@ -51,9 +51,18 @@ public class TubeMQSourceRequest extends SourceRequest {
     @ApiModelProperty("Session key of the TubeMQ")
     private String sessionKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
     @ApiModelProperty(value = "Data separator")
     private String dataSeparator;
 
+    @ApiModelProperty(value = "KV separator")
+    private String kvSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol")
+    private String dataEscapeChar;
+
     @ApiModelProperty(value = "The message body wrap  wrap type, including: 
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
     private String wrapType;
 

Reply via email to