This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 52498f1d1b [INLONG-9240][Manager][Sort] Add append mode for the 
Iceberg connector (#9242)
52498f1d1b is described below

commit 52498f1d1bc9038b3c567353fdf37685d2112142
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed Nov 15 15:51:19 2023 +0800

    [INLONG-9240][Manager][Sort] Add append mode for the Iceberg connector 
(#9242)
---
 .../apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java |  3 +++
 .../inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java |  6 ++++++
 .../manager/pojo/sort/node/provider/IcebergProvider.java     |  3 ++-
 .../inlong/sort/protocol/constant/IcebergConstant.java       |  2 ++
 .../inlong/sort/protocol/node/load/IcebergLoadNode.java      | 12 +++++++++---
 .../inlong/sort/protocol/node/load/IcebergLoadNodeTest.java  |  3 ++-
 .../apache/inlong/sort/parser/IcebergNodeSqlParserTest.java  |  6 ++++--
 7 files changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
index e7bb7325d0..19b988b79d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
@@ -72,6 +72,9 @@ public class IcebergSink extends StreamSink {
     @ApiModelProperty("Primary key")
     private String primaryKey;
 
+    @ApiModelProperty("append mode, UPSERT or APPEND")
+    private String appendMode;
+
     public IcebergSink() {
         this.setSinkType(SinkType.ICEBERG);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
index 129790f90c..aa3c606b3f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -27,6 +27,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
+import javax.validation.constraints.Pattern;
+
 /**
  * Iceberg sink request.
  */
@@ -64,4 +66,8 @@ public class IcebergSinkRequest extends SinkRequest {
     @ApiModelProperty("Primary key")
     private String primaryKey;
 
+    @ApiModelProperty("append mode, UPSERT or APPEND")
+    @Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode")
+    private String appendMode;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 4af912f347..2147c3159a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -97,7 +97,8 @@ public class IcebergProvider implements ExtractNodeProvider, 
LoadNodeProvider {
                 icebergSink.getPrimaryKey(),
                 catalogType,
                 icebergSink.getCatalogUri(),
-                icebergSink.getWarehouse());
+                icebergSink.getWarehouse(),
+                icebergSink.getAppendMode());
     }
 
     @Override
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index 2cce35fb9b..ce9b81b8a9 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -36,6 +36,8 @@ public class IcebergConstant {
     public static final String STREAMING = "streaming";
     public static final String STARTING_STRATEGY_KEY = "starting-strategy";
 
+    public static final String APPEND_MODE_KEY = "appendMode";
+
     /**
      * Iceberg supported catalog type
      */
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 6ffaf5f2da..7f76891dfd 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -73,6 +73,9 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
     @JsonProperty("warehouse")
     private String warehouse;
 
+    @JsonProperty("appendMode")
+    private String appendMode;
+
     @JsonCreator
     public IcebergLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -87,7 +90,8 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
             @JsonProperty("primaryKey") String primaryKey,
             @JsonProperty("catalogType") IcebergConstant.CatalogType 
catalogType,
             @JsonProperty("uri") String uri,
-            @JsonProperty("warehouse") String warehouse) {
+            @JsonProperty("warehouse") String warehouse,
+            @JsonProperty("appendMode") String appendMode) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "table name is 
null");
         this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
@@ -95,6 +99,7 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
         this.catalogType = catalogType == null ? CatalogType.HIVE : 
catalogType;
         this.uri = uri;
         this.warehouse = warehouse;
+        this.appendMode = appendMode;
     }
 
     @Override
@@ -108,11 +113,12 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
         options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
         options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
         options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
+        options.put(IcebergConstant.APPEND_MODE_KEY, appendMode);
         if (null != uri) {
-            options.put("uri", uri);
+            options.put(IcebergConstant.URI_KEY, uri);
         }
         if (null != warehouse) {
-            options.put("warehouse", warehouse);
+            options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
         }
         return options;
     }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
index 69f2031235..24d5b39730 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
@@ -45,6 +45,7 @@ public class IcebergLoadNodeTest extends 
SerializeBaseTest<IcebergLoadNode> {
                 "id",
                 CatalogType.HIVE,
                 "thrift://localhost:9083",
-                "hdfs://localhost:9000/user/iceberg/warehouse");
+                "hdfs://localhost:9000/user/iceberg/warehouse",
+                null);
     }
 }
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index 41d5701fdf..fc0b889db0 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -100,7 +100,8 @@ public class IcebergNodeSqlParserTest extends 
AbstractTestBase {
                 null,
                 CatalogType.HADOOP,
                 null,
-                "hdfs://localhost:9000/iceberg/warehouse");
+                "hdfs://localhost:9000/iceberg/warehouse",
+                null);
     }
 
     private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() {
@@ -139,7 +140,8 @@ public class IcebergNodeSqlParserTest extends 
AbstractTestBase {
                 null,
                 CatalogType.HIVE,
                 "thrift://localhost:9083",
-                "/hive/warehouse");
+                "/hive/warehouse",
+                null);
     }
 
     /**

Reply via email to