This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8626f28761 [INLONG-9377][Sort] Fix init iceberg sink failed with upsert mode (#9379) 8626f28761 is described below commit 8626f287613d674c05a38931d9d6974baa5d91d6 Author: vernedeng <verned...@apache.org> AuthorDate: Tue Dec 5 10:02:24 2023 +0800 [INLONG-9377][Sort] Fix init iceberg sink failed with upsert mode (#9379) --- .../apache/inlong/sort/protocol/constant/IcebergConstant.java | 3 +-- .../apache/inlong/sort/protocol/node/load/IcebergLoadNode.java | 10 ++++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java index ce9b81b8a9..cf12115080 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java @@ -35,8 +35,7 @@ public class IcebergConstant { public static final String START_SNAPSHOT_ID = "start-snapshot-id"; public static final String STREAMING = "streaming"; public static final String STARTING_STRATEGY_KEY = "starting-strategy"; - - public static final String APPEND_MODE_KEY = "appendMode"; + public static final String UPSERT_ENABLED_KEY = "upsert-enabled"; /** * Iceberg supported catalog type diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index 9e3fb9e8dd..3b08fa490f 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -180,7 +180,13 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName); options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name()); options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name()); - options.put(IcebergConstant.APPEND_MODE_KEY, appendMode); + + if ("upsert".equals(appendMode)) { + options.put(IcebergConstant.UPSERT_ENABLED_KEY, Boolean.TRUE.toString()); + } else { + options.put(IcebergConstant.UPSERT_ENABLED_KEY, Boolean.FALSE.toString()); + } + if (null != uri) { options.put(IcebergConstant.URI_KEY, uri); } @@ -194,7 +200,7 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern); options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern); } else { - options.put(SINK_MULTIPLE_ENABLE, "false"); + options.put(SINK_MULTIPLE_ENABLE, Boolean.FALSE.toString()); } return options;