This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8626f28761 [INLONG-9377][Sort] Fix init iceberg sink failed with 
upsert mode (#9379)
8626f28761 is described below

commit 8626f287613d674c05a38931d9d6974baa5d91d6
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Dec 5 10:02:24 2023 +0800

    [INLONG-9377][Sort] Fix init iceberg sink failed with upsert mode (#9379)
---
 .../apache/inlong/sort/protocol/constant/IcebergConstant.java  |  3 +--
 .../apache/inlong/sort/protocol/node/load/IcebergLoadNode.java | 10 ++++++++--
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index ce9b81b8a9..cf12115080 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -35,8 +35,7 @@ public class IcebergConstant {
     public static final String START_SNAPSHOT_ID = "start-snapshot-id";
     public static final String STREAMING = "streaming";
     public static final String STARTING_STRATEGY_KEY = "starting-strategy";
-
-    public static final String APPEND_MODE_KEY = "appendMode";
+    public static final String UPSERT_ENABLED_KEY = "upsert-enabled";
 
     /**
      * Iceberg supported catalog type
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 9e3fb9e8dd..3b08fa490f 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -180,7 +180,13 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
         options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
         options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
         options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
-        options.put(IcebergConstant.APPEND_MODE_KEY, appendMode);
+
+        if ("upsert".equals(appendMode)) {
+            options.put(IcebergConstant.UPSERT_ENABLED_KEY, 
Boolean.TRUE.toString());
+        } else {
+            options.put(IcebergConstant.UPSERT_ENABLED_KEY, 
Boolean.FALSE.toString());
+        }
+
         if (null != uri) {
             options.put(IcebergConstant.URI_KEY, uri);
         }
@@ -194,7 +200,7 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
             options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
             options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
         } else {
-            options.put(SINK_MULTIPLE_ENABLE, "false");
+            options.put(SINK_MULTIPLE_ENABLE, Boolean.FALSE.toString());
         }
 
         return options;

Reply via email to