This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 30c7adafda0 branch-3.1: [feat](paimon)support paimon incr read #51661 
(#52922)
30c7adafda0 is described below

commit 30c7adafda0d3631656993bb9059e600a5921c0f
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 8 19:39:31 2025 -0700

    branch-3.1: [feat](paimon)support paimon incr read #51661 (#52922)
    
    bp #51661
    
    Co-authored-by: wlong <[email protected]>
    Co-authored-by: wanglong16 <[email protected]>
---
 .../create_preinstalled_scripts/paimon/run04.sql   |  16 +
 .../datasource/paimon/source/PaimonScanNode.java   | 232 +++++++++++++-
 .../paimon/source/PaimonScanNodeTest.java          | 340 +++++++++++++++++----
 .../external_table_p0/paimon/paimon_incr_read.out  | Bin 0 -> 1097 bytes
 .../paimon/paimon_incr_read.groovy                 | 102 +++++++
 5 files changed, 626 insertions(+), 64 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run04.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run04.sql
new file mode 100644
index 00000000000..5c4f7a3fea2
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run04.sql
@@ -0,0 +1,16 @@
+use paimon;
+
+create database if not exists test_paimon_incr_read_db;
+
+use test_paimon_incr_read_db;
+
+CREATE TABLE paimon_incr (
+    id INT,
+    name STRING,
+    age INT
+) USING paimon;
+
+INSERT INTO paimon_incr (id, name, age) VALUES (1, 'Alice', 30);
+INSERT INTO paimon_incr (id, name, age) VALUES (2, 'Bob', 25);
+INSERT INTO paimon_incr (id, name, age) VALUES (3, 'Charlie', 28);
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 87680686986..ea1f8b8e72d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -49,6 +49,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.RawFile;
@@ -69,6 +70,23 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 public class PaimonScanNode extends FileQueryScanNode {
+    private static final Logger LOG = 
LogManager.getLogger(PaimonScanNode.class);
+
+    private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
+    // The keys of incremental read params for Paimon SDK
+    private static final String PAIMON_SCAN_SNAPSHOT_ID = "scan.snapshot-id";
+    private static final String PAIMON_SCAN_MODE = "scan.mode";
+    private static final String PAIMON_INCREMENTAL_BETWEEN = 
"incremental-between";
+    private static final String PAIMON_INCREMENTAL_BETWEEN_SCAN_MODE = 
"incremental-between-scan-mode";
+    private static final String PAIMON_INCREMENTAL_BETWEEN_TIMESTAMP = 
"incremental-between-timestamp";
+    // The keys of incremental read params for Doris Statement
+    private static final String DORIS_START_SNAPSHOT_ID = "startSnapshotId";
+    private static final String DORIS_END_SNAPSHOT_ID = "endSnapshotId";
+    private static final String DORIS_START_TIMESTAMP = "startTimestamp";
+    private static final String DORIS_END_TIMESTAMP = "endTimestamp";
+    private static final String DORIS_INCREMENTAL_BETWEEN_SCAN_MODE = 
"incrementalBetweenScanMode";
+    private static final String DEFAULT_INCREMENTAL_BETWEEN_SCAN_MODE = "auto";
+
     private enum SplitReadType {
         JNI,
         NATIVE,
@@ -111,14 +129,12 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
     }
 
-    private static final Logger LOG = 
LogManager.getLogger(PaimonScanNode.class);
     private PaimonSource source = null;
     private List<Predicate> predicates;
     private int rawFileSplitNum = 0;
     private int paimonSplitNum = 0;
     private List<SplitStat> splitStats = new ArrayList<>();
     private String serializedTable;
-    private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
 
     public PaimonScanNode(PlanNodeId id,
             TupleDescriptor desc,
@@ -344,20 +360,33 @@ public class PaimonScanNode extends FileQueryScanNode {
     }
 
     @VisibleForTesting
-    public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() {
+    public Map<String, String> getIncrReadParams() throws UserException {
+        Map<String, String> paimonScanParams = new HashMap<>();
+        if (scanParams != null && scanParams.incrementalRead()) {
+            // Validate parameter combinations and get the result map
+            paimonScanParams = 
validateIncrementalReadParams(scanParams.getMapParams());
+        }
+        return paimonScanParams;
+    }
+
+    @VisibleForTesting
+    public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() 
throws UserException {
         if 
(!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key()))
 {
             // an empty table in PaimonSnapshotCacheValue
             return Collections.emptyList();
         }
         int[] projected = desc.getSlots().stream().mapToInt(
-                slot -> source.getPaimonTable().rowType()
-                        .getFieldNames()
-                        .stream()
-                        .map(String::toLowerCase)
-                        .collect(Collectors.toList())
-                        .indexOf(slot.getColumn().getName()))
-                .toArray();
-        ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
+            slot -> source.getPaimonTable().rowType()
+                    .getFieldNames()
+                    .stream()
+                    .map(String::toLowerCase)
+                    .collect(Collectors.toList())
+                    .indexOf(slot.getColumn().getName()))
+                    .toArray();
+        Table paimonTable = source.getPaimonTable();
+        Map<String, String> incrReadParams = getIncrReadParams();
+        paimonTable = paimonTable.copy(incrReadParams);
+        ReadBuilder readBuilder = paimonTable.newReadBuilder();
         return readBuilder.withFilter(predicates)
                 .withProjection(projected)
                 .newScan().plan().splits();
@@ -456,4 +485,185 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
         ((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit + 
totalCount % size);
     }
+
+    @VisibleForTesting
+    public static Map<String, String> 
validateIncrementalReadParams(Map<String, String> params) throws UserException {
+        // Check if snapshot-based parameters exist
+        boolean hasStartSnapshotId = 
params.containsKey(DORIS_START_SNAPSHOT_ID)
+                && params.get(DORIS_START_SNAPSHOT_ID) != null;
+        boolean hasEndSnapshotId = params.containsKey(DORIS_END_SNAPSHOT_ID)
+                && params.get(DORIS_END_SNAPSHOT_ID) != null;
+        boolean hasIncrementalBetweenScanMode = 
params.containsKey(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE)
+                && params.get(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE) != null;
+
+        // Check if timestamp-based parameters exist
+        boolean hasStartTimestamp = params.containsKey(DORIS_START_TIMESTAMP)
+                && params.get(DORIS_START_TIMESTAMP) != null;
+        boolean hasEndTimestamp = params.containsKey(DORIS_END_TIMESTAMP) && 
params.get(DORIS_END_TIMESTAMP) != null;
+
+        // Check if any snapshot-based parameters are present
+        boolean hasSnapshotParams = hasStartSnapshotId || hasEndSnapshotId || 
hasIncrementalBetweenScanMode;
+
+        // Check if any timestamp-based parameters are present
+        boolean hasTimestampParams = hasStartTimestamp || hasEndTimestamp;
+
+        // Rule 2: The two groups are mutually exclusive
+        if (hasSnapshotParams && hasTimestampParams) {
+            throw new UserException(
+                    "Cannot specify both snapshot-based parameters"
+                            + "(startSnapshotId, endSnapshotId, 
incrementalBetweenScanMode) "
+                            + "and timestamp-based parameters (startTimestamp, 
endTimestamp) at the same time");
+        }
+
+        // Validate snapshot-based parameters group
+        if (hasSnapshotParams) {
+            // Rule 3.1 & 3.2: DORIS_START_SNAPSHOT_ID is required
+            if (!hasStartSnapshotId) {
+                throw new UserException("startSnapshotId is required when 
using snapshot-based incremental read");
+            }
+
+            // Rule 3.3: DORIS_INCREMENTAL_BETWEEN_SCAN_MODE can only appear
+            // when both start and end snapshot IDs are specified
+            if (hasIncrementalBetweenScanMode && (!hasStartSnapshotId || 
!hasEndSnapshotId)) {
+                throw new UserException(
+                        "incrementalBetweenScanMode can only be specified when"
+                                + " both startSnapshotId and endSnapshotId are 
provided");
+            }
+
+            // Validate snapshot ID values
+            if (hasStartSnapshotId) {
+                try {
+                    long startSId = 
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
+                    if (startSId <= 0) {
+                        throw new UserException("startSnapshotId must be 
greater than 0");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new UserException("Invalid startSnapshotId format: " 
+ e.getMessage());
+                }
+            }
+
+            if (hasEndSnapshotId) {
+                try {
+                    long endSId = 
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
+                    if (endSId <= 0) {
+                        throw new UserException("endSnapshotId must be greater 
than 0");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new UserException("Invalid endSnapshotId format: " + 
e.getMessage());
+                }
+            }
+
+            // Check if both snapshot IDs are present and validate their 
relationship
+            if (hasStartSnapshotId && hasEndSnapshotId) {
+                try {
+                    long startSId = 
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
+                    long endSId = 
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
+                    if (startSId >= endSId) {
+                        throw new UserException("startSnapshotId must be less 
than endSnapshotId");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new UserException("Invalid snapshot ID format: " + 
e.getMessage());
+                }
+            }
+
+            // Validate DORIS_INCREMENTAL_BETWEEN_SCAN_MODE
+            if (hasIncrementalBetweenScanMode) {
+                String scanMode = 
params.get(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE).toLowerCase();
+                if (!scanMode.equals("auto") && !scanMode.equals("diff")
+                        && !scanMode.equals("delta") && 
!scanMode.equals("changelog")) {
+                    throw new UserException("incrementalBetweenScanMode must 
be one of: auto, diff, delta, changelog");
+                }
+            }
+        }
+
+        // Validate timestamp-based parameters group
+        if (hasTimestampParams) {
+            // Rule 4.1 & 4.2: DORIS_START_TIMESTAMP is required
+            if (!hasStartTimestamp) {
+                throw new UserException("startTimestamp is required when using 
timestamp-based incremental read");
+            }
+
+            // Validate timestamp values
+            if (hasStartTimestamp) {
+                try {
+                    long startTS = 
Long.parseLong(params.get(DORIS_START_TIMESTAMP));
+                    if (startTS < 0) {
+                        throw new UserException("startTimestamp must be 
greater than or equal to 0");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new UserException("Invalid startTimestamp format: " 
+ e.getMessage());
+                }
+            }
+
+            if (hasEndTimestamp) {
+                try {
+                    long endTS = 
Long.parseLong(params.get(DORIS_END_TIMESTAMP));
+                    if (endTS <= 0) {
+                        throw new UserException("endTimestamp must be greater 
than 0");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new UserException("Invalid endTimestamp format: " + 
e.getMessage());
+                }
+            }
+
+            // Check if both timestamps are present and validate their 
relationship
+            if (hasStartTimestamp && hasEndTimestamp) {
+                try {
+                    long startTS = 
Long.parseLong(params.get(DORIS_START_TIMESTAMP));
+                    long endTS = 
Long.parseLong(params.get(DORIS_END_TIMESTAMP));
+                    if (startTS >= endTS) {
+                        throw new UserException("startTimestamp must be less 
than endTimestamp");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new UserException("Invalid timestamp format: " + 
e.getMessage());
+                }
+            }
+        }
+
+        // If no incremental parameters are provided at all, that's also 
invalid in this context
+        if (!hasSnapshotParams && !hasTimestampParams) {
+            throw new UserException(
+                    "Invalid paimon incremental read params: at least one 
valid parameter group must be specified");
+        }
+
+        // Fill the result map based on parameter combinations
+        Map<String, String> paimonScanParams = new HashMap<>();
+        paimonScanParams.put(PAIMON_SCAN_SNAPSHOT_ID, null);
+        paimonScanParams.put(PAIMON_SCAN_MODE, null);
+
+        if (hasSnapshotParams) {
+            paimonScanParams.put(PAIMON_SCAN_MODE, null);
+            if (hasStartSnapshotId && !hasEndSnapshotId) {
+                // Only startSnapshotId is specified
+                paimonScanParams.put(PAIMON_SCAN_SNAPSHOT_ID, 
params.get(DORIS_START_SNAPSHOT_ID));
+            } else if (hasStartSnapshotId && hasEndSnapshotId) {
+                // Both start and end snapshot IDs are specified
+                String startSId = params.get(DORIS_START_SNAPSHOT_ID);
+                String endSId = params.get(DORIS_END_SNAPSHOT_ID);
+                paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN, startSId + 
"," + endSId);
+            }
+
+            // Add incremental between scan mode if present
+            if (hasIncrementalBetweenScanMode) {
+                paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN_SCAN_MODE,
+                        params.get(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE));
+            }
+        }
+
+        if (hasTimestampParams) {
+            String startTS = params.get(DORIS_START_TIMESTAMP);
+            String endTS = params.get(DORIS_END_TIMESTAMP);
+
+            if (hasStartTimestamp && !hasEndTimestamp) {
+                // Only startTimestamp is specified
+                paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN_TIMESTAMP, 
startTS + "," + Long.MAX_VALUE);
+            } else if (hasStartTimestamp && hasEndTimestamp) {
+                // Both start and end timestamps are specified
+                paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN_TIMESTAMP, 
startTS + "," + endTS);
+            }
+        }
+
+        return paimonScanParams;
+    }
 }
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 963178c3882..2e712969c1e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -20,34 +20,35 @@ package org.apache.doris.datasource.paimon.source;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
 
-import mockit.Expectations;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.RawFile;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+@RunWith(MockitoJUnitRunner.class)
 public class PaimonScanNodeTest {
-    @Mocked
+    @Mock
     private SessionVariable sv;
 
-    @Mocked
+    @Mock
     private PaimonFileExternalCatalog paimonFileExternalCatalog;
 
     @Test
@@ -81,41 +82,30 @@ public class PaimonScanNodeTest {
                 .build();
 
 
-        new MockUp<PaimonScanNode>() {
-            @Mock
-            public List<org.apache.paimon.table.source.Split> 
getPaimonSplitFromAPI() {
-                return new ArrayList<org.apache.paimon.table.source.Split>() {{
-                        add(ds1);
-                        add(ds2);
-                    }};
+        // Mock PaimonScanNode to return test data splits
+        PaimonScanNode spyPaimonScanNode = Mockito.spy(paimonScanNode);
+        Mockito.doReturn(new ArrayList<org.apache.paimon.table.source.Split>() 
{
+            {
+                add(ds1);
+                add(ds2);
             }
-        };
+        }).when(spyPaimonScanNode).getPaimonSplitFromAPI();
 
-        new MockUp<PaimonSource>() {
-            @Mock
-            public ExternalCatalog getCatalog() {
-                return paimonFileExternalCatalog;
-            }
-        };
-
-        new MockUp<ExternalCatalog>() {
-            @Mock
-            public Map<String, String> getProperties() {
-                return Collections.emptyMap();
-            }
-        };
+        // Mock PaimonSource to return catalog
+        PaimonSource mockPaimonSource = Mockito.mock(PaimonSource.class);
+        
Mockito.when(mockPaimonSource.getCatalog()).thenReturn(paimonFileExternalCatalog);
+        spyPaimonScanNode.setSource(mockPaimonSource);
 
-        new Expectations() {{
-                sv.isForceJniScanner();
-                result = false;
+        // Mock ExternalCatalog properties
+        
Mockito.when(paimonFileExternalCatalog.getProperties()).thenReturn(Collections.emptyMap());
 
-                sv.getIgnoreSplitType();
-                result = "NONE";
-            }};
+        // Mock SessionVariable behavior
+        Mockito.when(sv.isForceJniScanner()).thenReturn(false);
+        Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE");
 
         // native
-        mockNativeReader();
-        List<org.apache.doris.spi.Split> s1 = paimonScanNode.getSplits(1);
+        mockNativeReader(spyPaimonScanNode);
+        List<org.apache.doris.spi.Split> s1 = spyPaimonScanNode.getSplits(1);
         PaimonSplit s11 = (PaimonSplit) s1.get(0);
         PaimonSplit s12 = (PaimonSplit) s1.get(1);
         Assert.assertEquals(2, s1.size());
@@ -125,8 +115,8 @@ public class PaimonScanNodeTest {
         Assert.assertNull(s12.getSplit());
 
         // jni
-        mockJniReader();
-        List<org.apache.doris.spi.Split> s2 = paimonScanNode.getSplits(1);
+        mockJniReader(spyPaimonScanNode);
+        List<org.apache.doris.spi.Split> s2 = spyPaimonScanNode.getSplits(1);
         PaimonSplit s21 = (PaimonSplit) s2.get(0);
         PaimonSplit s22 = (PaimonSplit) s2.get(1);
         Assert.assertEquals(2, s2.size());
@@ -136,21 +126,265 @@ public class PaimonScanNodeTest {
         Assert.assertEquals(50, s22.getSplitWeight().getRawValue());
     }
 
-    private void mockJniReader() {
-        new MockUp<PaimonScanNode>() {
-            @Mock
-            public boolean supportNativeReader(Optional<List<RawFile>> 
optRawFiles) {
-                return false;
-            }
-        };
+    @Test
+    public void testValidateIncrementalReadParams() throws UserException {
+        // Test valid parameter combinations
+
+        // 1. Only startSnapshotId
+        Map<String, String> params = new HashMap<>();
+        params.put("startSnapshotId", "5");
+        Map<String, String> result = 
PaimonScanNode.validateIncrementalReadParams(params);
+        Assert.assertEquals("5", result.get("scan.snapshot-id"));
+        Assert.assertNull(result.get("scan.mode"));
+        Assert.assertEquals(2, result.size());
+
+        // 2. Both startSnapshotId and endSnapshotId
+        params.clear();
+        params.put("startSnapshotId", "1");
+        params.put("endSnapshotId", "5");
+        result = PaimonScanNode.validateIncrementalReadParams(params);
+        Assert.assertEquals("1,5", result.get("incremental-between"));
+        Assert.assertTrue(result.containsKey("scan.mode") && 
result.get("scan.mode") == null);
+        Assert.assertEquals(3, result.size());
+
+        // 3. startSnapshotId + endSnapshotId + incrementalBetweenScanMode
+        params.clear();
+        params.put("startSnapshotId", "2");
+        params.put("endSnapshotId", "8");
+        params.put("incrementalBetweenScanMode", "diff");
+        result = PaimonScanNode.validateIncrementalReadParams(params);
+        Assert.assertEquals("2,8", result.get("incremental-between"));
+        Assert.assertEquals("diff", 
result.get("incremental-between-scan-mode"));
+        Assert.assertTrue(result.containsKey("scan.mode") && 
result.get("scan.mode") == null);
+        Assert.assertEquals(4, result.size());
+
+        // 4. Only startTimestamp
+        params.clear();
+        params.put("startTimestamp", "1000");
+        result = PaimonScanNode.validateIncrementalReadParams(params);
+        Assert.assertEquals("1000," + Long.MAX_VALUE, 
result.get("incremental-between-timestamp"));
+        Assert.assertTrue(result.containsKey("scan.mode") && 
result.get("scan.mode") == null);
+        Assert.assertTrue(result.containsKey("scan.snapshot-id") && 
result.get("scan.snapshot-id") == null);
+        Assert.assertEquals(3, result.size());
+
+        // 5. Both startTimestamp and endTimestamp
+        params.clear();
+        params.put("startTimestamp", "1000");
+        params.put("endTimestamp", "2000");
+        result = PaimonScanNode.validateIncrementalReadParams(params);
+        Assert.assertEquals("1000,2000", 
result.get("incremental-between-timestamp"));
+        Assert.assertTrue(result.containsKey("scan.mode") && 
result.get("scan.mode") == null);
+        Assert.assertTrue(result.containsKey("scan.snapshot-id") && 
result.get("scan.snapshot-id") == null);
+        Assert.assertEquals(3, result.size());
+
+        // Test invalid parameter combinations
+
+        // 6. Test mutual exclusivity - both snapshot and timestamp params
+        params.clear();
+        params.put("startSnapshotId", "1");
+        params.put("startTimestamp", "1000");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for mutual exclusivity");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("Cannot specify both 
snapshot-based parameters"));
+        }
+
+        // 7. Test snapshot params without required startSnapshotId
+        params.clear();
+        params.put("endSnapshotId", "5");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when startSnapshotId is 
missing");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startSnapshotId is 
required"));
+        }
+
+        // 8. Test timestamp params without required startTimestamp
+        params.clear();
+        params.put("endTimestamp", "2000");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when startTimestamp is 
missing");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startTimestamp is 
required"));
+        }
+
+        // 9. Test incrementalBetweenScanMode without endSnapshotId
+        params.clear();
+        params.put("startSnapshotId", "1");
+        params.put("incrementalBetweenScanMode", "auto");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when 
incrementalBetweenScanMode appears without endSnapshotId");
+        } catch (UserException e) {
+            
Assert.assertTrue(e.getMessage().contains("incrementalBetweenScanMode can only 
be specified when both"));
+        }
+
+        // 10. Test incrementalBetweenScanMode alone
+        params.clear();
+        params.put("incrementalBetweenScanMode", "auto");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when 
incrementalBetweenScanMode appears alone");
+        } catch (UserException e) {
+            Assert.assertTrue(
+                    e.getMessage().contains("startSnapshotId is required when 
using snapshot-based incremental read"));
+        }
+
+        // 11. Test invalid snapshot ID values (≤ 0)
+        params.clear();
+        params.put("startSnapshotId", "0");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for startSnapshotId ≤ 0");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
greater than 0"));
+        }
+
+        params.clear();
+        params.put("startSnapshotId", "-1");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for negative startSnapshotId");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
greater than 0"));
+        }
+
+        params.clear();
+        params.put("startSnapshotId", "1");
+        params.put("endSnapshotId", "0");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for endSnapshotId ≤ 0");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("endSnapshotId must be 
greater than 0"));
+        }
+
+        // 12. Test start ≥ end for snapshot IDs
+        params.clear();
+        params.put("startSnapshotId", "5");
+        params.put("endSnapshotId", "5");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when startSnapshotId = 
endSnapshotId");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
less than endSnapshotId"));
+        }
+
+        params.clear();
+        params.put("startSnapshotId", "6");
+        params.put("endSnapshotId", "5");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when startSnapshotId > 
endSnapshotId");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
less than endSnapshotId"));
+        }
+
+        // 13. Test invalid timestamp values (≤ 0)
+        params.clear();
+        params.put("startTimestamp", "-1");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for startTimestamp < 0");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startTimestamp must be 
greater than or equal to 0"));
+        }
+
+        params.clear();
+        params.put("startTimestamp", "1000");
+        params.put("endTimestamp", "0");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for endTimestamp ≤ 0");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("endTimestamp must be 
greater than 0"));
+        }
+
+        // 14. Test start ≥ end for timestamps
+        params.clear();
+        params.put("startTimestamp", "2000");
+        params.put("endTimestamp", "2000");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when startTimestamp = 
endTimestamp");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startTimestamp must be 
less than endTimestamp"));
+        }
+
+        params.clear();
+        params.put("startTimestamp", "3000");
+        params.put("endTimestamp", "2000");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when startTimestamp > 
endTimestamp");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("startTimestamp must be 
less than endTimestamp"));
+        }
+
+        // 15. Test invalid number format
+        params.clear();
+        params.put("startSnapshotId", "invalid");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for invalid number format");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("Invalid startSnapshotId 
format"));
+        }
+
+        params.clear();
+        params.put("startTimestamp", "invalid");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for invalid timestamp format");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("Invalid startTimestamp 
format"));
+        }
+
+        // 16. Test invalid incrementalBetweenScanMode values
+        params.clear();
+        params.put("startSnapshotId", "1");
+        params.put("endSnapshotId", "5");
+        params.put("incrementalBetweenScanMode", "invalid");
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception for invalid scan mode");
+        } catch (UserException e) {
+            Assert.assertTrue(
+                    e.getMessage().contains("incrementalBetweenScanMode must 
be one of: auto, diff, delta, changelog"));
+        }
+
+        // 17. Test valid incrementalBetweenScanMode values (case insensitive)
+        String[] validModes = {"auto", "AUTO", "diff", "DIFF", "delta", 
"DELTA", "changelog", "CHANGELOG"};
+        for (String mode : validModes) {
+            params.clear();
+            params.put("startSnapshotId", "1");
+            params.put("endSnapshotId", "5");
+            params.put("incrementalBetweenScanMode", mode);
+            result = PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.assertEquals("1,5", result.get("incremental-between"));
+            Assert.assertEquals(mode, 
result.get("incremental-between-scan-mode"));
+            Assert.assertTrue(result.containsKey("scan.mode") && 
result.get("scan.mode") == null);
+            Assert.assertTrue(result.containsKey("scan.mode") && 
result.get("scan.mode") == null);
+            Assert.assertEquals(4, result.size());
+        }
+
+        // 18. Test no parameters at all
+        params.clear();
+        try {
+            PaimonScanNode.validateIncrementalReadParams(params);
+            Assert.fail("Should throw exception when no parameters provided");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("at least one valid 
parameter group must be specified"));
+        }
     }
 
-    private void mockNativeReader() {
-        new MockUp<PaimonScanNode>() {
-            @Mock
-            public boolean supportNativeReader(Optional<List<RawFile>> 
optRawFiles) {
-                return true;
-            }
-        };
+    private void mockJniReader(PaimonScanNode spyNode) {
+        
Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
+    }
+
+    private void mockNativeReader(PaimonScanNode spyNode) {
+        
Mockito.doReturn(true).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
     }
 }
diff --git a/regression-test/data/external_table_p0/paimon/paimon_incr_read.out 
b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
new file mode 100644
index 00000000000..6472ab41b64
Binary files /dev/null and 
b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out differ
diff --git 
a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy 
b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
new file mode 100644
index 00000000000..cfe9fbf8960
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_incr_read", 
"p0,external,doris,external_docker,external_docker_doris") {
+    logger.info("start paimon test")
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable paimon test.")
+        return
+    }
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String catalog_name = "test_paimon_incr_read_catalog"
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    try {
+        sql """drop catalog if exists ${catalog_name}"""
+
+        sql """
+                CREATE CATALOG ${catalog_name} PROPERTIES (
+                        'type' = 'paimon',
+                        'warehouse' = 's3://warehouse/wh',
+                        's3.endpoint' = 
'http://${externalEnvIp}:${minio_port}',
+                        's3.access_key' = 'admin',
+                        's3.secret_key' = 'password',
+                        's3.path.style.access' = 'true'
+                );
+            """
+        sql """switch `${catalog_name}`"""
+        sql """use test_paimon_incr_read_db"""
+
+        def test_incr_read = { String force ->
+            sql """ set force_jni_scanner=${force} """
+            order_qt_snapshot_incr1  """select * from 
paimon_incr@incr('startSnapshotId'=1)"""
+            order_qt_snapshot_incr2  """select * from 
paimon_incr@incr('startSnapshotId'=2)"""
+            order_qt_snapshot_incr3  """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2)"""
+            order_qt_snapshot_incr4  """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=3)"""
+            order_qt_snapshot_incr5  """select * from 
paimon_incr@incr('startSnapshotId'=2, 'endSnapshotId'=3)"""
+            order_qt_timestamp_incr1  """select * from 
paimon_incr@incr('startTimestamp'=0)"""
+            order_qt_timestamp_incr2  """select * from 
paimon_incr@incr('startTimestamp'=0, 'endTimestamp' = 1)"""
+            order_qt_timestamp_incr3  """select * from 
paimon_incr@incr('startTimestamp'=0, 'endtimestamp' = 999999999999999)"""
+
+            order_qt_scan_mode1 """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 
'incrementalBetweenScanMode' = 'auto');"""
+            order_qt_scan_mode2 """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 
'incrementalBetweenScanMode' = 'diff');"""
+            order_qt_scan_mode3 """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 
'incrementalBetweenScanMode' = 'delta');"""
+            order_qt_scan_mode4 """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 
'incrementalBetweenScanMode' = 'changelog');"""
+            
+
+            // complex query
+            qt_cte """with cte1 as (select * from 
paimon_incr@incr('startTimestamp'=0)) select name, age from cte1 order by 
age;"""
+            qt_join """select * from paimon_incr@incr('startSnapshotId'=1, 
'endSnapshotId'=2) t1 join paimon_incr@incr('startTimestamp'=0) t2 on t1.id = 
t2.id order by id;"""
+
+            test {
+                sql """select * from paimon_incr@incr('startTimestamp'=-1);"""
+                exception "startTimestamp must be greater than or equal to 0"
+            }
+            test {
+                sql """select * from paimon_incr@incr('startTimestam'=-1)"""
+                exception "at least one valid parameter group must be 
specified"
+            }
+            test {
+                sql """select * from 
paimon_incr@incr('endTimestamp'=999999999999999)"""
+                exception "startTimestamp is required when using 
timestamp-based incremental read"
+            }
+            test {
+                sql """select * from paimon_incr@incr()"""
+                exception "at least one valid parameter group must be 
specified"
+            }
+            test {
+                sql """select * from 
paimon_incr@incr('incrementalBetweenScanMode' = 'auto');"""
+                exception "startSnapshotId is required when using 
snapshot-based incremental read"
+            }
+            test {
+                sql """select * from paimon_incr@incr('startSnapshotId'=1, 
'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'error');"""
+                exception "incrementalBetweenScanMode must be one of"
+            }
+            test {
+                sql """select * from paimon_incr@incr('startSnapshotId'=1, 
'endSnapshotId'=1)"""
+                exception "startSnapshotId must be less than endSnapshotId"
+            }
+        }
+
+        test_incr_read("false")
+        test_incr_read("true")
+    } finally {
+        // sql """drop catalog if exists ${catalog_name}"""
+    }
+}
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to