This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ffb627a01c [INLONG-9362][Manager] Iceberg support config all migrate (#9363) ffb627a01c is described below commit ffb627a01ca5da9ea4e58a1d75d7a6b84bf1d2fd Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Nov 30 15:30:07 2023 +0800 [INLONG-9362][Manager] Iceberg support config all migrate (#9363) --- .../manager/pojo/sink/iceberg/IcebergSink.java | 15 ++++++++ .../manager/pojo/sink/iceberg/IcebergSinkDTO.java | 18 ++++++++++ .../pojo/sink/iceberg/IcebergSinkRequest.java | 15 ++++++++ .../inlong/manager/pojo/sort/node/NodeFactory.java | 4 +++ .../pojo/sort/node/base/ExtractNodeProvider.java | 4 +++ .../pojo/sort/node/base/LoadNodeProvider.java | 13 +++++++ .../pojo/sort/node/provider/IcebergProvider.java | 41 ++++++++++++++++++++-- .../manager/pojo/stream/InlongStreamBriefInfo.java | 3 ++ .../manager/pojo/stream/InlongStreamExtParam.java | 3 ++ .../manager/pojo/stream/InlongStreamInfo.java | 3 ++ .../manager/pojo/stream/InlongStreamRequest.java | 3 ++ .../main/resources/h2/apache_inlong_manager.sql | 3 ++ .../manager-web/sql/apache_inlong_manager.sql | 3 ++ inlong-manager/manager-web/sql/changes-1.10.0.sql | 4 +-- 14 files changed, 128 insertions(+), 4 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java index 19b988b79d..5fc2afc586 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java @@ -75,6 +75,21 @@ public class IcebergSink extends StreamSink { @ApiModelProperty("append mode, UPSERT or APPEND") private String appendMode; + @ApiModelProperty("The multiple enable of sink") + private Boolean sinkMultipleEnable = false; + + @ApiModelProperty("The multiple format of sink") + private String sinkMultipleFormat; + + @ApiModelProperty("database pattern") + private String databasePattern; + + @ApiModelProperty("table pattern") + private String tablePattern; + + @ApiModelProperty("enable schema change") + private Boolean enableSchemaChange = false; + public IcebergSink() { this.setSinkType(SinkType.ICEBERG); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java index 2c8dc3f7bf..379aaa0a75 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java @@ -68,6 +68,24 @@ public class IcebergSinkDTO { @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation") private String partitionType; + @ApiModelProperty("The multiple enable of sink") + private Boolean sinkMultipleEnable = false; + + @ApiModelProperty("The multiple format of sink") + private String sinkMultipleFormat; + + @ApiModelProperty("database pattern") + private String databasePattern; + + @ApiModelProperty("table pattern") + private String tablePattern; + + @ApiModelProperty("append mode, UPSERT or APPEND") + private String appendMode; + + @ApiModelProperty("enable schema change") + private Boolean enableSchemaChange = false; + @ApiModelProperty("Primary key") private String primaryKey; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java index aa3c606b3f..68bb94cc68 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java @@ -70,4 +70,19 @@ public class IcebergSinkRequest extends SinkRequest { @Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode") private String appendMode; + @ApiModelProperty("The multiple enable of sink") + private Boolean sinkMultipleEnable = false; + + @ApiModelProperty("The multiple format of sink") + private String sinkMultipleFormat; + + @ApiModelProperty("database pattern") + private String databasePattern; + + @ApiModelProperty("table pattern") + private String tablePattern; + + @ApiModelProperty("enable schema change") + private Boolean enableSchemaChange = false; + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java index fdf68e0ce9..88a7992f78 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java @@ -103,6 +103,10 @@ public class NodeFactory { sourceInfo.getSourceType()); LoadNodeProvider loadNodeProvider = LoadNodeProviderFactory.getLoadNodeProvider(sinkInfo.getSinkType()); + if (loadNodeProvider.isSinkMultiple(sinkInfo)) { + sourceInfo.setFieldList(loadNodeProvider.addStreamFieldsForSinkMultiple(sourceInfo.getFieldList())); + sinkInfo.setSinkFieldList(loadNodeProvider.addSinkFieldsForSinkMultiple(sinkInfo.getSinkFieldList())); + } if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(), loadNodeProvider.getMetaFields())) { extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList()); if (CollectionUtils.isNotEmpty(transformResponses)) { diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java index 42b36b7591..d7e3c89648 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java @@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.node.format.JsonFormat; import org.apache.inlong.sort.protocol.node.format.RawFormat; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.util.List; @@ -75,6 +76,9 @@ public interface ExtractNodeProvider extends NodeProvider { */ default List<FieldInfo> parseStreamFieldInfos(List<StreamField> streamFields, String nodeId, FieldTypeMappingStrategy fieldTypeMappingStrategy) { + if (CollectionUtils.isEmpty(streamFields)) { + return null; + } // Filter constant fields return streamFields.stream().filter(s -> Objects.isNull(s.getFieldValue())) .map(streamFieldInfo -> FieldInfoUtils diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java index 4af5e532d4..87f82e7185 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java @@ -40,6 +40,7 @@ import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -145,4 +146,16 @@ public interface LoadNodeProvider extends NodeProvider { default List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) { return sinkFields; } + + default Boolean isSinkMultiple(StreamNode nodeInfo) { + return false; + } + + default List<StreamField> addStreamFieldsForSinkMultiple(List<StreamField> streamFields) { + return new ArrayList<>(); + } + + default List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField> sinkFields) { + return new ArrayList<>(); + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java index 2147c3159a..37302bc396 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java @@ -33,10 +33,12 @@ import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.LoadNode; import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode; +import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.List; @@ -82,7 +84,8 @@ public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider { List<FieldInfo> fieldInfos = parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName()); List<FieldRelation> fieldRelations = parseSinkFields(icebergSink.getSinkFieldList(), constantFieldMap); IcebergConstant.CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType()); - + Format format = parsingSinkMultipleFormat(icebergSink.getSinkMultipleEnable(), + icebergSink.getSinkMultipleFormat()); return new IcebergLoadNode( icebergSink.getSinkName(), icebergSink.getSinkName(), @@ -98,7 +101,13 @@ public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider { catalogType, icebergSink.getCatalogUri(), icebergSink.getWarehouse(), - icebergSink.getAppendMode()); + icebergSink.getAppendMode(), + icebergSink.getSinkMultipleEnable(), + format, + icebergSink.getDatabasePattern(), + icebergSink.getTablePattern(), + icebergSink.getEnableSchemaChange(), + null); } @Override @@ -128,4 +137,32 @@ public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider { fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), MetaField.AUDIT_DATA_TIME)); return fieldInfos; } + + @Override + public Boolean isSinkMultiple(StreamNode nodeInfo) { + IcebergSink icebergSink = (IcebergSink) nodeInfo; + return icebergSink.getSinkMultipleEnable(); + } + + @Override + public List<StreamField> addStreamFieldsForSinkMultiple(List<StreamField> streamFields) { + if (CollectionUtils.isEmpty(streamFields)) { + streamFields = new ArrayList<>(); + } + streamFields.add(0, + new StreamField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", null, 1, + MetaField.DATA_BYTES_CANAL.name())); + return streamFields; + } + + @Override + public List<SinkField> addSinkFieldsForSinkMultiple(List<SinkField> sinkFields) { + if (CollectionUtils.isEmpty(sinkFields)) { + sinkFields = new ArrayList<>(); + } + sinkFields.add(0, new SinkField(0, "varbinary", MetaField.DATA_BYTES_CANAL.name(), "meta.data_canal", + MetaField.DATA_BYTES_CANAL.name(), "varbinary", 0, MetaField.DATA_BYTES_CANAL.name(), null)); + return sinkFields; + } + } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java index b8d04fde85..2a3d91e3a4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamBriefInfo.java @@ -88,6 +88,9 @@ public class InlongStreamBriefInfo { @ApiModelProperty(value = "Whether to ignore the parse errors of field value") private Boolean ignoreParseError; + @ApiModelProperty("The multiple enable of sink") + private Boolean sinkMultipleEnable = false; + @ApiModelProperty(value = "Status") private Integer status; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java index 2690576aad..9b357117c8 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java @@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable { @ApiModelProperty(value = "Predefined fields") private String predefinedFields; + @ApiModelProperty("The multiple enable of sink") + private Boolean sinkMultipleEnable = false; + @ApiModelProperty(value = "Extended field size") private Integer extendedFieldSize = 0; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java index a8cb701748..57f9bbeba2 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java @@ -139,6 +139,9 @@ public class InlongStreamInfo extends BaseInlongStream { @ApiModelProperty(value = "Extended field size") private Integer extendedFieldSize = 0; + @ApiModelProperty("The multiple enable of sink") + private Boolean sinkMultipleEnable; + @ApiModelProperty(value = "Whether to ignore the parse errors of field value") private Boolean ignoreParseError = true; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java index bb6a0d2964..fc2b012925 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java @@ -127,6 +127,9 @@ public class InlongStreamRequest extends BaseInlongStream { @ApiModelProperty(value = "Extended field size") private Integer extendedFieldSize = 0; + @ApiModelProperty("The multiple enable of sink") + private Boolean sinkMultipleEnable; + @ApiModelProperty(value = "The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, PB, etc") private String wrapType; diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index d27d70d5a3..168c2fc7ca 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -302,6 +302,9 @@ CREATE TABLE IF NOT EXISTS `operation_log` ( `id` int(11) NOT NULL AUTO_INCREMENT, `authentication_type` varchar(64) DEFAULT NULL COMMENT 'Authentication type', + `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id', + `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream id', + `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation target', `operation_type` varchar(256) DEFAULT NULL COMMENT 'Operation type', `http_method` varchar(64) DEFAULT NULL COMMENT 'Request method', `invoke_method` varchar(256) DEFAULT NULL COMMENT 'Invoke method', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index f1b5379e0a..f64c45b63d 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -319,6 +319,9 @@ CREATE TABLE IF NOT EXISTS `operation_log` ( `id` int(11) NOT NULL AUTO_INCREMENT, `authentication_type` varchar(64) DEFAULT NULL COMMENT 'Authentication type', + `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id', + `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream id', + `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation target', `operation_type` varchar(256) DEFAULT NULL COMMENT 'Operation type', `http_method` varchar(64) DEFAULT NULL COMMENT 'Request method', `invoke_method` varchar(256) DEFAULT NULL COMMENT 'Invoke method', diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql b/inlong-manager/manager-web/sql/changes-1.10.0.sql index 3a1abeb04b..a09a83faf7 100644 --- a/inlong-manager/manager-web/sql/changes-1.10.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql @@ -40,10 +40,10 @@ ALTER TABLE `operation_log` ADD COLUMN `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id'; ALTER TABLE `operation_log` - ADD COLUMN `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream id', + ADD COLUMN `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream id'; ALTER TABLE `operation_log` - ADD COLUMN `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation target', + ADD COLUMN `operation_target` varchar(256) DEFAULT NULL COMMENT 'Operation target'; CREATE INDEX operation_log_group_stream_index ON operation_log (`inlong_group_id`, `inlong_stream_id`);