This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e9cbc3671f [INLONG-10370][Manager] Support configuration of kV data format (#10371) e9cbc3671f is described below commit e9cbc3671fc3ecad4b6ca70e628015d4fbb1e03b Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Fri Jun 7 10:08:23 2024 +0800 [INLONG-10370][Manager] Support configuration of kV data format (#10371) --- .../org/apache/inlong/common/enums/DataTypeEnum.java | 1 + .../org/apache/inlong/common/util/StringUtil.java | 9 +++++++++ .../pojo/sort/node/base/ExtractNodeProvider.java | 20 ++++++++++++++------ .../pojo/sort/node/provider/KafkaProvider.java | 2 ++ .../pojo/sort/node/provider/PulsarProvider.java | 2 ++ .../pojo/sort/node/provider/TubeMqProvider.java | 2 ++ .../manager/pojo/source/kafka/KafkaSource.java | 3 +++ .../manager/pojo/source/kafka/KafkaSourceDTO.java | 8 ++++++-- .../pojo/source/kafka/KafkaSourceRequest.java | 9 +++++---- .../manager/pojo/source/pulsar/PulsarSource.java | 3 +++ .../manager/pojo/source/pulsar/PulsarSourceDTO.java | 8 ++++++-- .../pojo/source/pulsar/PulsarSourceRequest.java | 9 +++++---- .../manager/pojo/source/tubemq/TubeMQSource.java | 9 +++++++++ .../manager/pojo/source/tubemq/TubeMQSourceDTO.java | 12 +++++++++++- .../pojo/source/tubemq/TubeMQSourceRequest.java | 9 +++++++++ 15 files changed, 87 insertions(+), 19 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java index 385c5ed71d..e0dc5c1251 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java @@ -25,6 +25,7 @@ import java.util.Locale; public enum DataTypeEnum { CSV("csv"), + KV("kv"), AVRO("avro"), JSON("json"), CANAL("canal"), diff --git a/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java b/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java index cddef9dc8f..7df1eb6975 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java @@ -17,6 +17,8 @@ package org.apache.inlong.common.util; +import org.apache.commons.lang3.StringUtils; + import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -274,4 +276,11 @@ public class StringUtil { } } + public static String parseChar(String charStr) { + if (StringUtils.isNumeric(charStr)) { + char numberChar = (char) Integer.parseInt(charStr); + charStr = Character.toString(numberChar); + } + return charStr; + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java index 81b9763d79..52c09062d7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.base; import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.enums.MessageWrapType; +import org.apache.inlong.common.util.StringUtil; import org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy; import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils; import org.apache.inlong.manager.pojo.stream.StreamField; @@ -32,10 +33,10 @@ import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.node.format.JsonFormat; +import org.apache.inlong.sort.protocol.node.format.KvFormat; import org.apache.inlong.sort.protocol.node.format.RawFormat; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import java.util.List; import java.util.Objects; @@ -90,8 +91,10 @@ public interface ExtractNodeProvider extends NodeProvider { * Parse format * * @param serializationType data serialization, support: csv, json, canal, avro, etc - * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat} + * @param wrapType whether wrap content with {@link InLongMsgFormat} * @param separatorStr the separator of data content + * @param kvSeparatorStr the kv separator + * @param escapeCharStr the escape char * @param ignoreParseErrors whether ignore deserialization error data * @return the format for serialized content */ @@ -99,15 +102,14 @@ public interface ExtractNodeProvider extends NodeProvider { String serializationType, String wrapType, String separatorStr, + String kvSeparatorStr, + String escapeCharStr, Boolean ignoreParseErrors) { Format format; DataTypeEnum dataType = DataTypeEnum.forType(serializationType); switch (dataType) { case CSV: - if (StringUtils.isNumeric(separatorStr)) { - char dataSeparator = (char) Integer.parseInt(separatorStr); - separatorStr = Character.toString(dataSeparator); - } + separatorStr = StringUtil.parseChar(separatorStr); CsvFormat csvFormat = new CsvFormat(separatorStr); csvFormat.setIgnoreParseErrors(ignoreParseErrors); format = csvFormat; @@ -131,6 +133,12 @@ public interface ExtractNodeProvider extends NodeProvider { case RAW: format = new RawFormat(); break; + case KV: + separatorStr = StringUtil.parseChar(separatorStr); + kvSeparatorStr = StringUtil.parseChar(kvSeparatorStr); + escapeCharStr = StringUtil.parseChar(escapeCharStr); + format = new KvFormat(separatorStr, kvSeparatorStr, escapeCharStr, ignoreParseErrors, null, null, null); + break; default: throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType)); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java index 9e602c0293..e7ca76241e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java @@ -68,6 +68,8 @@ public class KafkaProvider implements ExtractNodeProvider, LoadNodeProvider { kafkaSource.getSerializationType(), kafkaSource.getWrapType(), kafkaSource.getDataSeparator(), + kafkaSource.getKvSeparator(), + kafkaSource.getDataEscapeChar(), kafkaSource.getIgnoreParseError()); KafkaScanStartupMode startupMode = parseStartupMode(kafkaSource.getAutoOffsetReset()); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java index 1d8ace1480..9493f78f41 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java @@ -59,6 +59,8 @@ public class PulsarProvider implements ExtractNodeProvider { Format format = parsingFormat(pulsarSource.getSerializationType(), pulsarSource.getWrapType(), pulsarSource.getDataSeparator(), + pulsarSource.getKvSeparator(), + pulsarSource.getDataEscapeChar(), pulsarSource.getIgnoreParseError()); PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode()); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java index 6b80d4735e..d2553a76ab 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java @@ -55,6 +55,8 @@ public class TubeMqProvider implements ExtractNodeProvider { source.getSerializationType(), source.getWrapType(), source.getDataSeparator(), + source.getKvSeparator(), + source.getDataEscapeChar(), source.getIgnoreParseError()); Map<String, String> properties = parseProperties(source.getProperties()); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java index 061e9176af..5c8afdbf5c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java @@ -90,6 +90,9 @@ public class KafkaSource extends StreamSource { @ApiModelProperty(value = "Data separator") private String dataSeparator; + @ApiModelProperty(value = "KV separator") + private String kvSeparator; + @ApiModelProperty(value = "Data field escape symbol") private String dataEscapeChar; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java index 082b1adf7c..730f63f7b3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import javax.validation.constraints.NotNull; +import java.nio.charset.StandardCharsets; import java.util.Map; /** @@ -91,10 +92,13 @@ public class KafkaSourceDTO { private String primaryKey; @ApiModelProperty(value = "Data encoding format: UTF-8, GBK") - private String dataEncoding; + private String dataEncoding = StandardCharsets.UTF_8.toString(); @ApiModelProperty(value = "Data separator") - private String dataSeparator; + private String dataSeparator = String.valueOf((int) '|'); + + @ApiModelProperty(value = "KV separator") + private String kvSeparator; @ApiModelProperty(value = "Data field escape symbol") private String dataEscapeChar; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java index e9343253df..89da06d277 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java @@ -27,8 +27,6 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import java.nio.charset.StandardCharsets; - /** * Kafka source request */ @@ -80,10 +78,13 @@ public class KafkaSourceRequest extends SourceRequest { private String primaryKey; @ApiModelProperty(value = "Data encoding format: UTF-8, GBK") - private String dataEncoding = StandardCharsets.UTF_8.toString(); + private String dataEncoding; @ApiModelProperty(value = "Data separator") - private String dataSeparator = String.valueOf((int) '|'); + private String dataSeparator; + + @ApiModelProperty(value = "KV separator") + private String kvSeparator; @ApiModelProperty(value = "Data field escape symbol") private String dataEscapeChar; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java index 884100c988..a000e58147 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java @@ -73,6 +73,9 @@ public class PulsarSource extends StreamSource { @ApiModelProperty(value = "Data separator") private String dataSeparator; + @ApiModelProperty(value = "KV separator") + private String kvSeparator; + @ApiModelProperty(value = "Data field escape symbol") private String dataEscapeChar; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java index 2f9c9b1ab1..5fb40984bc 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import javax.validation.constraints.NotNull; +import java.nio.charset.StandardCharsets; import java.util.Map; /** @@ -64,10 +65,13 @@ public class PulsarSourceDTO { private String primaryKey; @ApiModelProperty(value = "Data encoding format: UTF-8, GBK") - private String dataEncoding; + private String dataEncoding = StandardCharsets.UTF_8.toString(); @ApiModelProperty(value = "Data separator") - private String dataSeparator; + private String dataSeparator = String.valueOf((int) '|'); + + @ApiModelProperty(value = "KV separator") + private String kvSeparator; @ApiModelProperty(value = "Data field escape symbol") private String dataEscapeChar; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java index 0c6946bc6e..6e7be5125b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java @@ -27,8 +27,6 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import java.nio.charset.StandardCharsets; - /** * Pulsar source request */ @@ -61,10 +59,13 @@ public class PulsarSourceRequest extends SourceRequest { private String primaryKey; @ApiModelProperty(value = "Data encoding format: UTF-8, GBK") - private String dataEncoding = StandardCharsets.UTF_8.toString(); + private String dataEncoding; @ApiModelProperty(value = "Data separator") - private String dataSeparator = String.valueOf((int) '|'); + private String dataSeparator; + + @ApiModelProperty(value = "KV separator") + private String kvSeparator; @ApiModelProperty(value = "Data field escape symbol") private String dataEscapeChar; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java index a2539b4c98..786aef0259 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java @@ -59,9 +59,18 @@ public class TubeMQSource extends StreamSource { @ApiModelProperty("Session key of the TubeMQ") private String sessionKey; + @ApiModelProperty(value = "Data encoding format: UTF-8, GBK") + private String dataEncoding; + @ApiModelProperty(value = "Data separator") private String dataSeparator; + @ApiModelProperty(value = "KV separator") + private String kvSeparator; + + @ApiModelProperty(value = "Data field escape symbol") + private String dataEscapeChar; + /** * The TubeMQ consumers use this streamId set to filter records reading from server. */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java index bae3b951bd..dbae1cbb0b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import javax.validation.constraints.NotNull; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.TreeSet; @@ -55,8 +56,17 @@ public class TubeMQSourceDTO { @ApiModelProperty("Session key of the TubeMQ") private String sessionKey; + @ApiModelProperty(value = "Data encoding format: UTF-8, GBK") + private String dataEncoding = StandardCharsets.UTF_8.toString(); + @ApiModelProperty(value = "Data separator") - private String dataSeparator; + private String dataSeparator = String.valueOf((int) '|'); + + @ApiModelProperty(value = "KV separator") + private String kvSeparator; + + @ApiModelProperty(value = "Data field escape symbol") + private String dataEscapeChar; @ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc") private String wrapType; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java index 7ef7ca4c34..85ea5c10da 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java @@ -51,9 +51,18 @@ public class TubeMQSourceRequest extends SourceRequest { @ApiModelProperty("Session key of the TubeMQ") private String sessionKey; + @ApiModelProperty(value = "Data encoding format: UTF-8, GBK") + private String dataEncoding; + @ApiModelProperty(value = "Data separator") private String dataSeparator; + @ApiModelProperty(value = "KV separator") + private String kvSeparator; + + @ApiModelProperty(value = "Data field escape symbol") + private String dataEscapeChar; + @ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc") private String wrapType;