This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 4ef2b3d1e6 [INLONG-8953][Sort] Fix IcebergSource defaults the 
StartSnapshot to the latest (#8954)
4ef2b3d1e6 is described below

commit 4ef2b3d1e61f6e3195b11a4477d92db01c150528
Author: vernedeng <verned...@apache.org>
AuthorDate: Thu Sep 21 10:10:14 2023 +0800

    [INLONG-8953][Sort] Fix IcebergSource defaults the StartSnapshot to the 
latest (#8954)
---
 .../apache/inlong/sort/protocol/constant/IcebergConstant.java  | 10 ++++++++++
 .../inlong/sort/protocol/node/extract/IcebergExtractNode.java  |  3 +++
 2 files changed, 13 insertions(+)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index 676f7f4435..2cce35fb9b 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -34,6 +34,7 @@ public class IcebergConstant {
     public static final String WAREHOUSE_KEY = "warehouse";
     public static final String START_SNAPSHOT_ID = "start-snapshot-id";
     public static final String STREAMING = "streaming";
+    public static final String STARTING_STRATEGY_KEY = "starting-strategy";
 
     /**
      * Iceberg supported catalog type
@@ -65,4 +66,13 @@ public class IcebergConstant {
             throw new IllegalArgumentException(String.format("Unsupport 
catalogType:%s", name));
         }
     }
+
+    public enum StreamingStartingStrategy {
+        TABLE_SCAN_THEN_INCREMENTAL,
+        INCREMENTAL_FROM_LATEST_SNAPSHOT,
+        INCREMENTAL_FROM_EARLIEST_SNAPSHOT,
+        INCREMENTAL_FROM_SNAPSHOT_ID,
+        INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP;
+
+    }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
index cbceb5485a..e87c111743 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
@@ -115,6 +115,7 @@ public class IcebergExtractNode extends ExtractNode 
implements InlongMetric, Met
         options.put(IcebergConstant.TABLE_KEY, tableName);
         options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
         options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName);
+        // support streaming only
         options.put(IcebergConstant.STREAMING, "true");
         options.put(IcebergConstant.STARTING_STRATEGY_KEY,
                 
IcebergConstant.StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL.name());
@@ -126,6 +127,8 @@ public class IcebergExtractNode extends ExtractNode 
implements InlongMetric, Met
         }
         if (null != startSnapShotId) {
             options.put(IcebergConstant.START_SNAPSHOT_ID, 
startSnapShotId.toString());
+            options.put(IcebergConstant.STARTING_STRATEGY_KEY,
+                    
IcebergConstant.StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID.name());
         }
         return options;
     }

Reply via email to