This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch branch-1.9 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.9 by this push: new 4ef2b3d1e6 [INLONG-8953][Sort] Fix IcebergSource defaults the StartSnapshot to the latest (#8954) 4ef2b3d1e6 is described below commit 4ef2b3d1e61f6e3195b11a4477d92db01c150528 Author: vernedeng <verned...@apache.org> AuthorDate: Thu Sep 21 10:10:14 2023 +0800 [INLONG-8953][Sort] Fix IcebergSource defaults the StartSnapshot to the latest (#8954) --- .../apache/inlong/sort/protocol/constant/IcebergConstant.java | 10 ++++++++++ .../inlong/sort/protocol/node/extract/IcebergExtractNode.java | 3 +++ 2 files changed, 13 insertions(+) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java index 676f7f4435..2cce35fb9b 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java @@ -34,6 +34,7 @@ public class IcebergConstant { public static final String WAREHOUSE_KEY = "warehouse"; public static final String START_SNAPSHOT_ID = "start-snapshot-id"; public static final String STREAMING = "streaming"; + public static final String STARTING_STRATEGY_KEY = "starting-strategy"; /** * Iceberg supported catalog type @@ -65,4 +66,13 @@ public class IcebergConstant { throw new IllegalArgumentException(String.format("Unsupport catalogType:%s", name)); } } + + public enum StreamingStartingStrategy { + TABLE_SCAN_THEN_INCREMENTAL, + INCREMENTAL_FROM_LATEST_SNAPSHOT, + INCREMENTAL_FROM_EARLIEST_SNAPSHOT, + INCREMENTAL_FROM_SNAPSHOT_ID, + INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP; + + } } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java index cbceb5485a..e87c111743 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java @@ -115,6 +115,7 @@ public class IcebergExtractNode extends ExtractNode implements InlongMetric, Met options.put(IcebergConstant.TABLE_KEY, tableName); options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name()); options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName); + // support streaming only options.put(IcebergConstant.STREAMING, "true"); options.put(IcebergConstant.STARTING_STRATEGY_KEY, IcebergConstant.StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL.name()); @@ -126,6 +127,8 @@ public class IcebergExtractNode extends ExtractNode implements InlongMetric, Met } if (null != startSnapShotId) { options.put(IcebergConstant.START_SNAPSHOT_ID, startSnapShotId.toString()); + options.put(IcebergConstant.STARTING_STRATEGY_KEY, + IcebergConstant.StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID.name()); } return options; }