This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb627a01c [INLONG-9362][Manager] Iceberg support  config all migrate 
(#9363)
ffb627a01c is described below

commit ffb627a01ca5da9ea4e58a1d75d7a6b84bf1d2fd
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Nov 30 15:30:07 2023 +0800

    [INLONG-9362][Manager] Iceberg support  config all migrate (#9363)
---
 .../manager/pojo/sink/iceberg/IcebergSink.java     | 15 ++++++++
 .../manager/pojo/sink/iceberg/IcebergSinkDTO.java  | 18 ++++++++++
 .../pojo/sink/iceberg/IcebergSinkRequest.java      | 15 ++++++++
 .../inlong/manager/pojo/sort/node/NodeFactory.java |  4 +++
 .../pojo/sort/node/base/ExtractNodeProvider.java   |  4 +++
 .../pojo/sort/node/base/LoadNodeProvider.java      | 13 +++++++
 .../pojo/sort/node/provider/IcebergProvider.java   | 41 ++++++++++++++++++++--
 .../manager/pojo/stream/InlongStreamBriefInfo.java |  3 ++
 .../manager/pojo/stream/InlongStreamExtParam.java  |  3 ++
 .../manager/pojo/stream/InlongStreamInfo.java      |  3 ++
 .../manager/pojo/stream/InlongStreamRequest.java   |  3 ++
 .../main/resources/h2/apache_inlong_manager.sql    |  3 ++
 .../manager-web/sql/apache_inlong_manager.sql      |  3 ++
 inlong-manager/manager-web/sql/changes-1.10.0.sql  |  4 +--
 14 files changed, 128 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
index 19b988b79d..5fc2afc586 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
@@ -75,6 +75,21 @@ public class IcebergSink extends StreamSink {
     @ApiModelProperty("append mode, UPSERT or APPEND")
     private String appendMode;
 
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
+    @ApiModelProperty("The multiple format of sink")
+    private String sinkMultipleFormat;
+
+    @ApiModelProperty("database pattern")
+    private String databasePattern;
+
+    @ApiModelProperty("table pattern")
+    private String tablePattern;
+
+    @ApiModelProperty("enable schema change")
+    private Boolean enableSchemaChange = false;
+
     public IcebergSink() {
         this.setSinkType(SinkType.ICEBERG);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
index 2c8dc3f7bf..379aaa0a75 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -68,6 +68,24 @@ public class IcebergSinkDTO {
     @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month, 
O-once, R-regulation")
     private String partitionType;
 
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
+    @ApiModelProperty("The multiple format of sink")
+    private String sinkMultipleFormat;
+
+    @ApiModelProperty("database pattern")
+    private String databasePattern;
+
+    @ApiModelProperty("table pattern")
+    private String tablePattern;
+
+    @ApiModelProperty("append mode, UPSERT or APPEND")
+    private String appendMode;
+
+    @ApiModelProperty("enable schema change")
+    private Boolean enableSchemaChange = false;
+
     @ApiModelProperty("Primary key")
     private String primaryKey;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
index aa3c606b3f..68bb94cc68 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -70,4 +70,19 @@ public class IcebergSinkRequest extends SinkRequest {
     @Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode")
     private String appendMode;
 
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
+    @ApiModelProperty("The multiple format of sink")
+    private String sinkMultipleFormat;
+
+    @ApiModelProperty("database pattern")
+    private String databasePattern;
+
+    @ApiModelProperty("table pattern")
+    private String tablePattern;
+
+    @ApiModelProperty("enable schema change")
+    private Boolean enableSchemaChange = false;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
index fdf68e0ce9..88a7992f78 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -103,6 +103,10 @@ public class NodeFactory {
                 sourceInfo.getSourceType());
         LoadNodeProvider loadNodeProvider = 
LoadNodeProviderFactory.getLoadNodeProvider(sinkInfo.getSinkType());
 
+        if (loadNodeProvider.isSinkMultiple(sinkInfo)) {
+            
sourceInfo.setFieldList(loadNodeProvider.addStreamFieldsForSinkMultiple(sourceInfo.getFieldList()));
+            
sinkInfo.setSinkFieldList(loadNodeProvider.addSinkFieldsForSinkMultiple(sinkInfo.getSinkFieldList()));
+        }
         if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(), 
loadNodeProvider.getMetaFields())) {
             extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList());
             if (CollectionUtils.isNotEmpty(transformResponses)) {
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index 42b36b7591..d7e3c89648 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -34,6 +34,7 @@ import 
org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
@@ -75,6 +76,9 @@ public interface ExtractNodeProvider extends NodeProvider {
      */
     default List<FieldInfo> parseStreamFieldInfos(List<StreamField> 
streamFields, String nodeId,
             FieldTypeMappingStrategy fieldTypeMappingStrategy) {
+        if (CollectionUtils.isEmpty(streamFields)) {
+            return null;
+        }
         // Filter constant fields
         return streamFields.stream().filter(s -> 
Objects.isNull(s.getFieldValue()))
                 .map(streamFieldInfo -> FieldInfoUtils
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 4af5e532d4..87f82e7185 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -40,6 +40,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -145,4 +146,16 @@ public interface LoadNodeProvider extends NodeProvider {
     default List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
         return sinkFields;
     }
+
+    default Boolean isSinkMultiple(StreamNode nodeInfo) {
+        return false;
+    }
+
+    default List<StreamField> addStreamFieldsForSinkMultiple(List<StreamField> 
streamFields) {
+        return new ArrayList<>();
+    }
+
+    default List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField> 
sinkFields) {
+        return new ArrayList<>();
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 2147c3159a..37302bc396 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -33,10 +33,12 @@ import 
org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -82,7 +84,8 @@ public class IcebergProvider implements ExtractNodeProvider, 
LoadNodeProvider {
         List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName());
         List<FieldRelation> fieldRelations = 
parseSinkFields(icebergSink.getSinkFieldList(), constantFieldMap);
         IcebergConstant.CatalogType catalogType = 
CatalogType.forName(icebergSink.getCatalogType());
-
+        Format format = 
parsingSinkMultipleFormat(icebergSink.getSinkMultipleEnable(),
+                icebergSink.getSinkMultipleFormat());
         return new IcebergLoadNode(
                 icebergSink.getSinkName(),
                 icebergSink.getSinkName(),
@@ -98,7 +101,13 @@ public class IcebergProvider implements 
ExtractNodeProvider, LoadNodeProvider {
                 catalogType,
                 icebergSink.getCatalogUri(),
                 icebergSink.getWarehouse(),
-                icebergSink.getAppendMode());
+                icebergSink.getAppendMode(),
+                icebergSink.getSinkMultipleEnable(),
+                format,
+                icebergSink.getDatabasePattern(),
+                icebergSink.getTablePattern(),
+                icebergSink.getEnableSchemaChange(),
+                null);
     }
 
     @Override
@@ -128,4 +137,32 @@ public class IcebergProvider implements 
ExtractNodeProvider, LoadNodeProvider {
         fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), 
MetaField.AUDIT_DATA_TIME));
         return fieldInfos;
     }
+
+    @Override
+    public Boolean isSinkMultiple(StreamNode nodeInfo) {
+        IcebergSink icebergSink = (IcebergSink) nodeInfo;
+        return icebergSink.getSinkMultipleEnable();
+    }
+
+    @Override
+    public List<StreamField> addStreamFieldsForSinkMultiple(List<StreamField> 
streamFields) {
+        if (CollectionUtils.isEmpty(streamFields)) {
+            streamFields = new ArrayList<>();
+        }
+        streamFields.add(0,
+                new StreamField(0, "varbinary", 
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1,
+                        MetaField.DATA_BYTES_CANAL.name()));
+        return streamFields;
+    }
+
+    @Override
+    public List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField> 
sinkFields) {
+        if (CollectionUtils.isEmpty(sinkFields)) {
+            sinkFields = new ArrayList<>();
+        }
+        sinkFields.add(0, new SinkField(0, "varbinary", 
MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal",
+                MetaField.DATA_BYTES_CANAL.name(), "varbinary", 0, 
MetaField.DATA_BYTES_CANAL.name(), null));
+        return sinkFields;
+    }
+
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
index b8d04fde85..2a3d91e3a4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java
@@ -88,6 +88,9 @@ public class InlongStreamBriefInfo {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError;
 
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
     @ApiModelProperty(value = "Status")
     private Integer status;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 2690576aad..9b357117c8 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "Predefined fields")
     private String predefinedFields;
 
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
     @ApiModelProperty(value = "Extended field size")
     private Integer extendedFieldSize = 0;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index a8cb701748..57f9bbeba2 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -139,6 +139,9 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "Extended field size")
     private Integer extendedFieldSize = 0;
 
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable;
+
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError = true;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index bb6a0d2964..fc2b012925 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -127,6 +127,9 @@ public class InlongStreamRequest extends BaseInlongStream {
     @ApiModelProperty(value = "Extended field size")
     private Integer extendedFieldSize = 0;
 
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable;
+
     @ApiModelProperty(value = "The message body  wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
     private String wrapType;
 
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index d27d70d5a3..168c2fc7ca 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -302,6 +302,9 @@ CREATE TABLE IF NOT EXISTS `operation_log`
 (
     `id`                  int(11)   NOT NULL AUTO_INCREMENT,
     `authentication_type` varchar(64)        DEFAULT NULL COMMENT 
'Authentication type',
+    `inlong_group_id`     varchar(256)       DEFAULT NULL COMMENT 'Inlong 
group id',
+    `inlong_stream_id`    varchar(256)       DEFAULT NULL COMMENT 'Inlong 
stream id',
+    `operation_target`    varchar(256)       DEFAULT NULL COMMENT 'Operation 
target',
     `operation_type`      varchar(256)       DEFAULT NULL COMMENT 'Operation 
type',
     `http_method`         varchar(64)        DEFAULT NULL COMMENT 'Request 
method',
     `invoke_method`       varchar(256)       DEFAULT NULL COMMENT 'Invoke 
method',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f1b5379e0a..f64c45b63d 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -319,6 +319,9 @@ CREATE TABLE IF NOT EXISTS `operation_log`
 (
     `id`                  int(11)   NOT NULL AUTO_INCREMENT,
     `authentication_type` varchar(64)        DEFAULT NULL COMMENT 
'Authentication type',
+    `inlong_group_id`     varchar(256)       DEFAULT NULL COMMENT 'Inlong 
group id',
+    `inlong_stream_id`    varchar(256)       DEFAULT NULL COMMENT 'Inlong 
stream id',
+    `operation_target`    varchar(256)       DEFAULT NULL COMMENT 'Operation 
target',
     `operation_type`      varchar(256)       DEFAULT NULL COMMENT 'Operation 
type',
     `http_method`         varchar(64)        DEFAULT NULL COMMENT 'Request 
method',
     `invoke_method`       varchar(256)       DEFAULT NULL COMMENT 'Invoke 
method',
diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql 
b/inlong-manager/manager-web/sql/changes-1.10.0.sql
index 3a1abeb04b..a09a83faf7 100644
--- a/inlong-manager/manager-web/sql/changes-1.10.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql
@@ -40,10 +40,10 @@ ALTER TABLE `operation_log`
     ADD COLUMN  `inlong_group_id`  varchar(256) DEFAULT NULL COMMENT 'Inlong 
group id';
 
 ALTER TABLE `operation_log`
-    ADD COLUMN  `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong 
stream id',
+    ADD COLUMN  `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong 
stream id';
 
 ALTER TABLE `operation_log`
-    ADD COLUMN `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation 
target',
+    ADD COLUMN `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation 
target';
 
 CREATE INDEX operation_log_group_stream_index ON operation_log 
(`inlong_group_id`, `inlong_stream_id`);
 

Reply via email to