This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 396a6a5a60 [INLONG-8202][Manager] Optimize Provider code to extract public parseFormat class. (#8203) 396a6a5a60 is described below commit 396a6a5a601972fbff3a12f52dc2b0286dd1c6a9 Author: chestnufang <65438734+chestnu...@users.noreply.github.com> AuthorDate: Fri Jun 9 17:32:00 2023 +0800 [INLONG-8202][Manager] Optimize Provider code to extract public parseFormat class. (#8203) --- .../pojo/sort/node/base/LoadNodeProvider.java | 29 +++++++ .../pojo/sort/node/provider/DorisProvider.java | 22 +---- .../sort/node/provider/ElasticsearchProvider.java | 25 +----- .../pojo/sort/node/provider/KafkaProvider.java | 96 +++++++++++++--------- .../pojo/sort/node/provider/StarRocksProvider.java | 23 +----- 5 files changed, 95 insertions(+), 100 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java index 2302c15943..2dd27f3416 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.pojo.sort.node.base; +import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.manager.common.enums.FieldType; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils; @@ -25,6 +26,9 @@ import org.apache.inlong.manager.pojo.stream.StreamNode; import org.apache.inlong.sort.formats.common.StringTypeInfo; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.LoadNode; +import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; +import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; +import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.transformation.ConstantParam; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.FunctionParam; @@ -97,4 +101,29 @@ public interface LoadNodeProvider extends NodeProvider { return new FieldRelation(inputField, outputField); }).collect(Collectors.toList()); } + + /** + * Parse format + * + * @param multipleEnable whether to enable multi-write + * @param multipleFormat data serialization format + * @return the format for serialized content + */ + default Format parsingSinkMultipleFormat(Boolean multipleEnable, String multipleFormat) { + Format format = null; + if (Boolean.TRUE.equals(multipleEnable) && StringUtils.isNotBlank(multipleFormat)) { + DataTypeEnum dataType = DataTypeEnum.forType(multipleFormat); + switch (dataType) { + case CANAL: + format = new CanalJsonFormat(); + break; + case DEBEZIUM_JSON: + format = new DebeziumJsonFormat(); + break; + default: + throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType)); + } + } + return format; + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java index 2586895f8a..e4e71333c9 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java @@ -17,7 +17,6 @@ package org.apache.inlong.manager.pojo.sort.node.provider; -import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.pojo.sink.doris.DorisSink; import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider; @@ -25,14 +24,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.stream.StreamNode; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.LoadNode; -import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; -import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.node.load.DorisLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelation; -import org.apache.commons.lang3.StringUtils; - import java.util.List; import java.util.Map; @@ -52,21 +47,8 @@ public class DorisProvider implements LoadNodeProvider { Map<String, String> properties = parseProperties(dorisSink.getProperties()); List<FieldInfo> fieldInfos = parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName()); List<FieldRelation> fieldRelations = parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap); - Format format = null; - if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank( - dorisSink.getSinkMultipleFormat())) { - DataTypeEnum dataType = DataTypeEnum.forType(dorisSink.getSinkMultipleFormat()); - switch (dataType) { - case CANAL: - format = new CanalJsonFormat(); - break; - case DEBEZIUM_JSON: - format = new DebeziumJsonFormat(); - break; - default: - throw new IllegalArgumentException(String.format("Unsupported dataType=%s for doris", dataType)); - } - } + Format format = parsingSinkMultipleFormat(dorisSink.getSinkMultipleEnable(), dorisSink.getSinkMultipleFormat()); + return new DorisLoadNode( dorisSink.getSinkName(), dorisSink.getSinkName(), diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java index 5072f9ac46..80861cbc59 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java @@ -17,7 +17,6 @@ package org.apache.inlong.manager.pojo.sort.node.provider; -import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink; @@ -26,14 +25,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.stream.StreamNode; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.LoadNode; -import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; -import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelation; -import org.apache.commons.lang3.StringUtils; - import java.util.List; import java.util.Map; @@ -54,23 +49,9 @@ public class ElasticsearchProvider implements LoadNodeProvider { List<SinkField> sinkFieldList = elasticsearchSink.getSinkFieldList(); List<FieldInfo> fieldInfos = parseSinkFieldInfos(sinkFieldList, elasticsearchSink.getSinkName()); List<FieldRelation> fieldRelations = parseSinkFields(sinkFieldList, constantFieldMap); - Format format = null; - if (elasticsearchSink.getSinkMultipleEnable() != null && elasticsearchSink.getSinkMultipleEnable() - && StringUtils.isNotBlank( - elasticsearchSink.getSinkMultipleFormat())) { - DataTypeEnum dataType = DataTypeEnum.forType(elasticsearchSink.getSinkMultipleFormat()); - switch (dataType) { - case CANAL: - format = new CanalJsonFormat(); - break; - case DEBEZIUM_JSON: - format = new DebeziumJsonFormat(); - break; - default: - throw new IllegalArgumentException( - String.format("Unsupported dataType=%s for elasticsearch", dataType)); - } - } + Format format = parsingSinkMultipleFormat(elasticsearchSink.getSinkMultipleEnable(), + elasticsearchSink.getSinkMultipleFormat()); + return new ElasticsearchLoadNode( elasticsearchSink.getSinkName(), elasticsearchSink.getSinkName(), diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java index adb1d911e1..bb355b30c6 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java @@ -63,31 +63,16 @@ public class KafkaProvider implements ExtractNodeProvider, LoadNodeProvider { List<FieldInfo> fieldInfos = parseStreamFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName()); Map<String, String> properties = parseProperties(kafkaSource.getProperties()); - String topic = kafkaSource.getTopic(); - String bootstrapServers = kafkaSource.getBootstrapServers(); - Format format = parsingFormat( kafkaSource.getSerializationType(), kafkaSource.isWrapWithInlongMsg(), kafkaSource.getDataSeparator(), kafkaSource.isIgnoreParseErrors()); - KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset()); - KafkaScanStartupMode startupMode; - switch (kafkaOffset) { - case EARLIEST: - startupMode = KafkaScanStartupMode.EARLIEST_OFFSET; - break; - case SPECIFIC: - startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS; - break; - case TIMESTAMP_MILLIS: - startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS; - break; - case LATEST: - default: - startupMode = KafkaScanStartupMode.LATEST_OFFSET; - } + KafkaScanStartupMode startupMode = parseStartupMode(kafkaSource.getAutoOffsetReset()); + String topic = kafkaSource.getTopic(); + String bootstrapServers = kafkaSource.getBootstrapServers(); + final String primaryKey = kafkaSource.getPrimaryKey(); String groupId = kafkaSource.getGroupId(); String partitionOffset = kafkaSource.getPartitionOffsets(); @@ -113,11 +98,61 @@ public class KafkaProvider implements ExtractNodeProvider, LoadNodeProvider { Map<String, String> properties = parseProperties(kafkaSink.getProperties()); List<FieldInfo> fieldInfos = parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName()); List<FieldRelation> fieldRelations = parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap); - Integer sinkParallelism = null; - if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) { - sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum()); + + String partitionNum = kafkaSink.getPartitionNum(); + Integer sinkParallelism = StringUtils.isNotBlank(partitionNum) ? Integer.parseInt(partitionNum) : null; + Format format = parseFormat(kafkaSink.getSerializationType()); + + return new KafkaLoadNode( + kafkaSink.getSinkName(), + kafkaSink.getSinkName(), + fieldInfos, + fieldRelations, + Lists.newArrayList(), + null, + kafkaSink.getTopicName(), + kafkaSink.getBootstrapServers(), + format, + sinkParallelism, + properties, + kafkaSink.getPrimaryKey()); + } + + /** + * parse Startup Mode + * + * @param autoOffsetReset The strategy of auto offset reset, including earliest, specific, latest (the + * default), none + * @return kafka scan startup mode + */ + private KafkaScanStartupMode parseStartupMode(String autoOffsetReset) { + KafkaOffset kafkaOffset = KafkaOffset.forName(autoOffsetReset); + KafkaScanStartupMode startupMode; + switch (kafkaOffset) { + case EARLIEST: + startupMode = KafkaScanStartupMode.EARLIEST_OFFSET; + break; + case SPECIFIC: + startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS; + break; + case TIMESTAMP_MILLIS: + startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS; + break; + case LATEST: + default: + startupMode = KafkaScanStartupMode.LATEST_OFFSET; } - DataTypeEnum dataType = DataTypeEnum.forType(kafkaSink.getSerializationType()); + return startupMode; + } + + /** + * parse Format + * + * @param serializationType data serialization, support: json, canal, avro + * @return the format for serialized content + */ + private Format parseFormat(String serializationType) { + DataTypeEnum dataType = DataTypeEnum.forType(serializationType); Format format; switch (dataType) { case CSV: @@ -141,19 +176,6 @@ public class KafkaProvider implements ExtractNodeProvider, LoadNodeProvider { default: throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType)); } - - return new KafkaLoadNode( - kafkaSink.getSinkName(), - kafkaSink.getSinkName(), - fieldInfos, - fieldRelations, - Lists.newArrayList(), - null, - kafkaSink.getTopicName(), - kafkaSink.getBootstrapServers(), - format, - sinkParallelism, - properties, - kafkaSink.getPrimaryKey()); + return format; } } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java index 588671d370..acae2efeb3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java @@ -17,7 +17,6 @@ package org.apache.inlong.manager.pojo.sort.node.provider; -import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink; import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider; @@ -25,14 +24,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.stream.StreamNode; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.LoadNode; -import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; -import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelation; -import org.apache.commons.lang3.StringUtils; - import java.util.List; import java.util.Map; @@ -52,23 +47,9 @@ public class StarRocksProvider implements LoadNodeProvider { Map<String, String> properties = parseProperties(starRocksSink.getProperties()); List<FieldInfo> fieldInfos = parseSinkFieldInfos(starRocksSink.getSinkFieldList(), starRocksSink.getSinkName()); List<FieldRelation> fieldRelations = parseSinkFields(starRocksSink.getSinkFieldList(), constantFieldMap); + Format format = parsingSinkMultipleFormat(starRocksSink.getSinkMultipleEnable(), + starRocksSink.getSinkMultipleFormat()); - Format format = null; - if (Boolean.TRUE.equals(starRocksSink.getSinkMultipleEnable()) - && StringUtils.isNotBlank(starRocksSink.getSinkMultipleFormat())) { - DataTypeEnum dataType = DataTypeEnum.forType(starRocksSink.getSinkMultipleFormat()); - switch (dataType) { - case CANAL: - format = new CanalJsonFormat(); - break; - case DEBEZIUM_JSON: - format = new DebeziumJsonFormat(); - break; - default: - throw new IllegalArgumentException( - String.format("Unsupported dataType=%s for StarRocks", dataType)); - } - } return new StarRocksLoadNode( starRocksSink.getSinkName(), starRocksSink.getSinkName(),