This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 396a6a5a60 [INLONG-8202][Manager] Optimize Provider code to extract 
public parseFormat class. (#8203)
396a6a5a60 is described below

commit 396a6a5a601972fbff3a12f52dc2b0286dd1c6a9
Author: chestnufang <65438734+chestnu...@users.noreply.github.com>
AuthorDate: Fri Jun 9 17:32:00 2023 +0800

    [INLONG-8202][Manager] Optimize Provider code to extract public parseFormat 
class. (#8203)
---
 .../pojo/sort/node/base/LoadNodeProvider.java      | 29 +++++++
 .../pojo/sort/node/provider/DorisProvider.java     | 22 +----
 .../sort/node/provider/ElasticsearchProvider.java  | 25 +-----
 .../pojo/sort/node/provider/KafkaProvider.java     | 96 +++++++++++++---------
 .../pojo/sort/node/provider/StarRocksProvider.java | 23 +-----
 5 files changed, 95 insertions(+), 100 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 2302c15943..2dd27f3416 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.sort.node.base;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
@@ -25,6 +26,9 @@ import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.formats.common.StringTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FunctionParam;
@@ -97,4 +101,29 @@ public interface LoadNodeProvider extends NodeProvider {
                     return new FieldRelation(inputField, outputField);
                 }).collect(Collectors.toList());
     }
+
+    /**
+     * Parse format
+     *
+     * @param multipleEnable whether to enable multi-write
+     * @param multipleFormat data serialization format
+     * @return the format for serialized content
+     */
+    default Format parsingSinkMultipleFormat(Boolean multipleEnable, String 
multipleFormat) {
+        Format format = null;
+        if (Boolean.TRUE.equals(multipleEnable) && 
StringUtils.isNotBlank(multipleFormat)) {
+            DataTypeEnum dataType = DataTypeEnum.forType(multipleFormat);
+            switch (dataType) {
+                case CANAL:
+                    format = new CanalJsonFormat();
+                    break;
+                case DEBEZIUM_JSON:
+                    format = new DebeziumJsonFormat();
+                    break;
+                default:
+                    throw new 
IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
+            }
+        }
+        return format;
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
index 2586895f8a..e4e71333c9 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
-import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
 import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -25,14 +24,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
-import org.apache.commons.lang3.StringUtils;
-
 import java.util.List;
 import java.util.Map;
 
@@ -52,21 +47,8 @@ public class DorisProvider implements LoadNodeProvider {
         Map<String, String> properties = 
parseProperties(dorisSink.getProperties());
         List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
         List<FieldRelation> fieldRelations = 
parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
-        Format format = null;
-        if (dorisSink.getSinkMultipleEnable() != null && 
dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
-                dorisSink.getSinkMultipleFormat())) {
-            DataTypeEnum dataType = 
DataTypeEnum.forType(dorisSink.getSinkMultipleFormat());
-            switch (dataType) {
-                case CANAL:
-                    format = new CanalJsonFormat();
-                    break;
-                case DEBEZIUM_JSON:
-                    format = new DebeziumJsonFormat();
-                    break;
-                default:
-                    throw new 
IllegalArgumentException(String.format("Unsupported dataType=%s for doris", 
dataType));
-            }
-        }
+        Format format = 
parsingSinkMultipleFormat(dorisSink.getSinkMultipleEnable(), 
dorisSink.getSinkMultipleFormat());
+
         return new DorisLoadNode(
                 dorisSink.getSinkName(),
                 dorisSink.getSinkName(),
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
index 5072f9ac46..80861cbc59 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
-import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
@@ -26,14 +25,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
-import org.apache.commons.lang3.StringUtils;
-
 import java.util.List;
 import java.util.Map;
 
@@ -54,23 +49,9 @@ public class ElasticsearchProvider implements 
LoadNodeProvider {
         List<SinkField> sinkFieldList = elasticsearchSink.getSinkFieldList();
         List<FieldInfo> fieldInfos = parseSinkFieldInfos(sinkFieldList, 
elasticsearchSink.getSinkName());
         List<FieldRelation> fieldRelations = parseSinkFields(sinkFieldList, 
constantFieldMap);
-        Format format = null;
-        if (elasticsearchSink.getSinkMultipleEnable() != null && 
elasticsearchSink.getSinkMultipleEnable()
-                && StringUtils.isNotBlank(
-                        elasticsearchSink.getSinkMultipleFormat())) {
-            DataTypeEnum dataType = 
DataTypeEnum.forType(elasticsearchSink.getSinkMultipleFormat());
-            switch (dataType) {
-                case CANAL:
-                    format = new CanalJsonFormat();
-                    break;
-                case DEBEZIUM_JSON:
-                    format = new DebeziumJsonFormat();
-                    break;
-                default:
-                    throw new IllegalArgumentException(
-                            String.format("Unsupported dataType=%s for 
elasticsearch", dataType));
-            }
-        }
+        Format format = 
parsingSinkMultipleFormat(elasticsearchSink.getSinkMultipleEnable(),
+                elasticsearchSink.getSinkMultipleFormat());
+
         return new ElasticsearchLoadNode(
                 elasticsearchSink.getSinkName(),
                 elasticsearchSink.getSinkName(),
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index adb1d911e1..bb355b30c6 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -63,31 +63,16 @@ public class KafkaProvider implements ExtractNodeProvider, 
LoadNodeProvider {
         List<FieldInfo> fieldInfos = 
parseStreamFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
         Map<String, String> properties = 
parseProperties(kafkaSource.getProperties());
 
-        String topic = kafkaSource.getTopic();
-        String bootstrapServers = kafkaSource.getBootstrapServers();
-
         Format format = parsingFormat(
                 kafkaSource.getSerializationType(),
                 kafkaSource.isWrapWithInlongMsg(),
                 kafkaSource.getDataSeparator(),
                 kafkaSource.isIgnoreParseErrors());
 
-        KafkaOffset kafkaOffset = 
KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
-        KafkaScanStartupMode startupMode;
-        switch (kafkaOffset) {
-            case EARLIEST:
-                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
-                break;
-            case SPECIFIC:
-                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
-                break;
-            case TIMESTAMP_MILLIS:
-                startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
-                break;
-            case LATEST:
-            default:
-                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
-        }
+        KafkaScanStartupMode startupMode = 
parseStartupMode(kafkaSource.getAutoOffsetReset());
+        String topic = kafkaSource.getTopic();
+        String bootstrapServers = kafkaSource.getBootstrapServers();
+
         final String primaryKey = kafkaSource.getPrimaryKey();
         String groupId = kafkaSource.getGroupId();
         String partitionOffset = kafkaSource.getPartitionOffsets();
@@ -113,11 +98,61 @@ public class KafkaProvider implements ExtractNodeProvider, 
LoadNodeProvider {
         Map<String, String> properties = 
parseProperties(kafkaSink.getProperties());
         List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName());
         List<FieldRelation> fieldRelations = 
parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap);
-        Integer sinkParallelism = null;
-        if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
-            sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
+
+        String partitionNum = kafkaSink.getPartitionNum();
+        Integer sinkParallelism = StringUtils.isNotBlank(partitionNum) ? 
Integer.parseInt(partitionNum) : null;
+        Format format = parseFormat(kafkaSink.getSerializationType());
+
+        return new KafkaLoadNode(
+                kafkaSink.getSinkName(),
+                kafkaSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                Lists.newArrayList(),
+                null,
+                kafkaSink.getTopicName(),
+                kafkaSink.getBootstrapServers(),
+                format,
+                sinkParallelism,
+                properties,
+                kafkaSink.getPrimaryKey());
+    }
+
+    /**
+     * parse Startup Mode
+     *
+     * @param autoOffsetReset The strategy of auto offset reset, including 
earliest, specific, latest (the
+     *         default), none
+     * @return kafka scan startup mode
+     */
+    private KafkaScanStartupMode parseStartupMode(String autoOffsetReset) {
+        KafkaOffset kafkaOffset = KafkaOffset.forName(autoOffsetReset);
+        KafkaScanStartupMode startupMode;
+        switch (kafkaOffset) {
+            case EARLIEST:
+                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
+                break;
+            case SPECIFIC:
+                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
+                break;
+            case TIMESTAMP_MILLIS:
+                startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
+                break;
+            case LATEST:
+            default:
+                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
         }
-        DataTypeEnum dataType = 
DataTypeEnum.forType(kafkaSink.getSerializationType());
+        return startupMode;
+    }
+
+    /**
+     * parse Format
+     *
+     * @param serializationType data serialization, support: json, canal, avro
+     * @return the format for serialized content
+     */
+    private Format parseFormat(String serializationType) {
+        DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
         Format format;
         switch (dataType) {
             case CSV:
@@ -141,19 +176,6 @@ public class KafkaProvider implements ExtractNodeProvider, 
LoadNodeProvider {
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
dataType=%s for Kafka", dataType));
         }
-
-        return new KafkaLoadNode(
-                kafkaSink.getSinkName(),
-                kafkaSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                Lists.newArrayList(),
-                null,
-                kafkaSink.getTopicName(),
-                kafkaSink.getBootstrapServers(),
-                format,
-                sinkParallelism,
-                properties,
-                kafkaSink.getPrimaryKey());
+        return format;
     }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
index 588671d370..acae2efeb3 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
-import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
 import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -25,14 +24,10 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
-import org.apache.commons.lang3.StringUtils;
-
 import java.util.List;
 import java.util.Map;
 
@@ -52,23 +47,9 @@ public class StarRocksProvider implements LoadNodeProvider {
         Map<String, String> properties = 
parseProperties(starRocksSink.getProperties());
         List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(starRocksSink.getSinkFieldList(), 
starRocksSink.getSinkName());
         List<FieldRelation> fieldRelations = 
parseSinkFields(starRocksSink.getSinkFieldList(), constantFieldMap);
+        Format format = 
parsingSinkMultipleFormat(starRocksSink.getSinkMultipleEnable(),
+                starRocksSink.getSinkMultipleFormat());
 
-        Format format = null;
-        if (Boolean.TRUE.equals(starRocksSink.getSinkMultipleEnable())
-                && 
StringUtils.isNotBlank(starRocksSink.getSinkMultipleFormat())) {
-            DataTypeEnum dataType = 
DataTypeEnum.forType(starRocksSink.getSinkMultipleFormat());
-            switch (dataType) {
-                case CANAL:
-                    format = new CanalJsonFormat();
-                    break;
-                case DEBEZIUM_JSON:
-                    format = new DebeziumJsonFormat();
-                    break;
-                default:
-                    throw new IllegalArgumentException(
-                            String.format("Unsupported dataType=%s for 
StarRocks", dataType));
-            }
-        }
         return new StarRocksLoadNode(
                 starRocksSink.getSinkName(),
                 starRocksSink.getSinkName(),

Reply via email to