This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 30c7adafda0 branch-3.1: [feat](paimon)support paimon incr read #51661
(#52922)
30c7adafda0 is described below
commit 30c7adafda0d3631656993bb9059e600a5921c0f
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 8 19:39:31 2025 -0700
branch-3.1: [feat](paimon)support paimon incr read #51661 (#52922)
bp #51661
Co-authored-by: wlong <[email protected]>
Co-authored-by: wanglong16 <[email protected]>
---
.../create_preinstalled_scripts/paimon/run04.sql | 16 +
.../datasource/paimon/source/PaimonScanNode.java | 232 +++++++++++++-
.../paimon/source/PaimonScanNodeTest.java | 340 +++++++++++++++++----
.../external_table_p0/paimon/paimon_incr_read.out | Bin 0 -> 1097 bytes
.../paimon/paimon_incr_read.groovy | 102 +++++++
5 files changed, 626 insertions(+), 64 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run04.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run04.sql
new file mode 100644
index 00000000000..5c4f7a3fea2
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run04.sql
@@ -0,0 +1,16 @@
+use paimon;
+
+create database if not exists test_paimon_incr_read_db;
+
+use test_paimon_incr_read_db;
+
+CREATE TABLE paimon_incr (
+ id INT,
+ name STRING,
+ age INT
+) USING paimon;
+
+INSERT INTO paimon_incr (id, name, age) VALUES (1, 'Alice', 30);
+INSERT INTO paimon_incr (id, name, age) VALUES (2, 'Bob', 25);
+INSERT INTO paimon_incr (id, name, age) VALUES (3, 'Charlie', 28);
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 87680686986..ea1f8b8e72d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -49,6 +49,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
@@ -69,6 +70,23 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class PaimonScanNode extends FileQueryScanNode {
+ private static final Logger LOG =
LogManager.getLogger(PaimonScanNode.class);
+
+ private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
+ // The keys of incremental read params for Paimon SDK
+ private static final String PAIMON_SCAN_SNAPSHOT_ID = "scan.snapshot-id";
+ private static final String PAIMON_SCAN_MODE = "scan.mode";
+ private static final String PAIMON_INCREMENTAL_BETWEEN =
"incremental-between";
+ private static final String PAIMON_INCREMENTAL_BETWEEN_SCAN_MODE =
"incremental-between-scan-mode";
+ private static final String PAIMON_INCREMENTAL_BETWEEN_TIMESTAMP =
"incremental-between-timestamp";
+ // The keys of incremental read params for Doris Statement
+ private static final String DORIS_START_SNAPSHOT_ID = "startSnapshotId";
+ private static final String DORIS_END_SNAPSHOT_ID = "endSnapshotId";
+ private static final String DORIS_START_TIMESTAMP = "startTimestamp";
+ private static final String DORIS_END_TIMESTAMP = "endTimestamp";
+ private static final String DORIS_INCREMENTAL_BETWEEN_SCAN_MODE =
"incrementalBetweenScanMode";
+ private static final String DEFAULT_INCREMENTAL_BETWEEN_SCAN_MODE = "auto";
+
private enum SplitReadType {
JNI,
NATIVE,
@@ -111,14 +129,12 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
- private static final Logger LOG =
LogManager.getLogger(PaimonScanNode.class);
private PaimonSource source = null;
private List<Predicate> predicates;
private int rawFileSplitNum = 0;
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
private String serializedTable;
- private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
@@ -344,20 +360,33 @@ public class PaimonScanNode extends FileQueryScanNode {
}
@VisibleForTesting
- public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() {
+ public Map<String, String> getIncrReadParams() throws UserException {
+ Map<String, String> paimonScanParams = new HashMap<>();
+ if (scanParams != null && scanParams.incrementalRead()) {
+ // Validate parameter combinations and get the result map
+ paimonScanParams =
validateIncrementalReadParams(scanParams.getMapParams());
+ }
+ return paimonScanParams;
+ }
+
+ @VisibleForTesting
+ public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI()
throws UserException {
if
(!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key()))
{
// an empty table in PaimonSnapshotCacheValue
return Collections.emptyList();
}
int[] projected = desc.getSlots().stream().mapToInt(
- slot -> source.getPaimonTable().rowType()
- .getFieldNames()
- .stream()
- .map(String::toLowerCase)
- .collect(Collectors.toList())
- .indexOf(slot.getColumn().getName()))
- .toArray();
- ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
+ slot -> source.getPaimonTable().rowType()
+ .getFieldNames()
+ .stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList())
+ .indexOf(slot.getColumn().getName()))
+ .toArray();
+ Table paimonTable = source.getPaimonTable();
+ Map<String, String> incrReadParams = getIncrReadParams();
+ paimonTable = paimonTable.copy(incrReadParams);
+ ReadBuilder readBuilder = paimonTable.newReadBuilder();
return readBuilder.withFilter(predicates)
.withProjection(projected)
.newScan().plan().splits();
@@ -456,4 +485,185 @@ public class PaimonScanNode extends FileQueryScanNode {
}
((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit +
totalCount % size);
}
+
+ @VisibleForTesting
+ public static Map<String, String>
validateIncrementalReadParams(Map<String, String> params) throws UserException {
+ // Check if snapshot-based parameters exist
+ boolean hasStartSnapshotId =
params.containsKey(DORIS_START_SNAPSHOT_ID)
+ && params.get(DORIS_START_SNAPSHOT_ID) != null;
+ boolean hasEndSnapshotId = params.containsKey(DORIS_END_SNAPSHOT_ID)
+ && params.get(DORIS_END_SNAPSHOT_ID) != null;
+ boolean hasIncrementalBetweenScanMode =
params.containsKey(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE)
+ && params.get(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE) != null;
+
+ // Check if timestamp-based parameters exist
+ boolean hasStartTimestamp = params.containsKey(DORIS_START_TIMESTAMP)
+ && params.get(DORIS_START_TIMESTAMP) != null;
+ boolean hasEndTimestamp = params.containsKey(DORIS_END_TIMESTAMP) &&
params.get(DORIS_END_TIMESTAMP) != null;
+
+ // Check if any snapshot-based parameters are present
+ boolean hasSnapshotParams = hasStartSnapshotId || hasEndSnapshotId ||
hasIncrementalBetweenScanMode;
+
+ // Check if any timestamp-based parameters are present
+ boolean hasTimestampParams = hasStartTimestamp || hasEndTimestamp;
+
+ // Rule 2: The two groups are mutually exclusive
+ if (hasSnapshotParams && hasTimestampParams) {
+ throw new UserException(
+ "Cannot specify both snapshot-based parameters"
+ + "(startSnapshotId, endSnapshotId,
incrementalBetweenScanMode) "
+ + "and timestamp-based parameters (startTimestamp,
endTimestamp) at the same time");
+ }
+
+ // Validate snapshot-based parameters group
+ if (hasSnapshotParams) {
+ // Rule 3.1 & 3.2: DORIS_START_SNAPSHOT_ID is required
+ if (!hasStartSnapshotId) {
+ throw new UserException("startSnapshotId is required when
using snapshot-based incremental read");
+ }
+
+ // Rule 3.3: DORIS_INCREMENTAL_BETWEEN_SCAN_MODE can only appear
+ // when both start and end snapshot IDs are specified
+ if (hasIncrementalBetweenScanMode && (!hasStartSnapshotId ||
!hasEndSnapshotId)) {
+ throw new UserException(
+ "incrementalBetweenScanMode can only be specified when"
+ + " both startSnapshotId and endSnapshotId are
provided");
+ }
+
+ // Validate snapshot ID values
+ if (hasStartSnapshotId) {
+ try {
+ long startSId =
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
+ if (startSId <= 0) {
+ throw new UserException("startSnapshotId must be
greater than 0");
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("Invalid startSnapshotId format: "
+ e.getMessage());
+ }
+ }
+
+ if (hasEndSnapshotId) {
+ try {
+ long endSId =
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
+ if (endSId <= 0) {
+ throw new UserException("endSnapshotId must be greater
than 0");
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("Invalid endSnapshotId format: " +
e.getMessage());
+ }
+ }
+
+ // Check if both snapshot IDs are present and validate their
relationship
+ if (hasStartSnapshotId && hasEndSnapshotId) {
+ try {
+ long startSId =
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
+ long endSId =
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
+ if (startSId >= endSId) {
+ throw new UserException("startSnapshotId must be less
than endSnapshotId");
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("Invalid snapshot ID format: " +
e.getMessage());
+ }
+ }
+
+ // Validate DORIS_INCREMENTAL_BETWEEN_SCAN_MODE
+ if (hasIncrementalBetweenScanMode) {
+ String scanMode =
params.get(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE).toLowerCase();
+ if (!scanMode.equals("auto") && !scanMode.equals("diff")
+ && !scanMode.equals("delta") &&
!scanMode.equals("changelog")) {
+ throw new UserException("incrementalBetweenScanMode must
be one of: auto, diff, delta, changelog");
+ }
+ }
+ }
+
+ // Validate timestamp-based parameters group
+ if (hasTimestampParams) {
+ // Rule 4.1 & 4.2: DORIS_START_TIMESTAMP is required
+ if (!hasStartTimestamp) {
+ throw new UserException("startTimestamp is required when using
timestamp-based incremental read");
+ }
+
+ // Validate timestamp values
+ if (hasStartTimestamp) {
+ try {
+ long startTS =
Long.parseLong(params.get(DORIS_START_TIMESTAMP));
+ if (startTS < 0) {
+ throw new UserException("startTimestamp must be
greater than or equal to 0");
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("Invalid startTimestamp format: "
+ e.getMessage());
+ }
+ }
+
+ if (hasEndTimestamp) {
+ try {
+ long endTS =
Long.parseLong(params.get(DORIS_END_TIMESTAMP));
+ if (endTS <= 0) {
+ throw new UserException("endTimestamp must be greater
than 0");
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("Invalid endTimestamp format: " +
e.getMessage());
+ }
+ }
+
+ // Check if both timestamps are present and validate their
relationship
+ if (hasStartTimestamp && hasEndTimestamp) {
+ try {
+ long startTS =
Long.parseLong(params.get(DORIS_START_TIMESTAMP));
+ long endTS =
Long.parseLong(params.get(DORIS_END_TIMESTAMP));
+ if (startTS >= endTS) {
+ throw new UserException("startTimestamp must be less
than endTimestamp");
+ }
+ } catch (NumberFormatException e) {
+ throw new UserException("Invalid timestamp format: " +
e.getMessage());
+ }
+ }
+ }
+
+ // If no incremental parameters are provided at all, that's also
invalid in this context
+ if (!hasSnapshotParams && !hasTimestampParams) {
+ throw new UserException(
+ "Invalid paimon incremental read params: at least one
valid parameter group must be specified");
+ }
+
+ // Fill the result map based on parameter combinations
+ Map<String, String> paimonScanParams = new HashMap<>();
+ paimonScanParams.put(PAIMON_SCAN_SNAPSHOT_ID, null);
+ paimonScanParams.put(PAIMON_SCAN_MODE, null);
+
+ if (hasSnapshotParams) {
+ paimonScanParams.put(PAIMON_SCAN_MODE, null);
+ if (hasStartSnapshotId && !hasEndSnapshotId) {
+ // Only startSnapshotId is specified
+ paimonScanParams.put(PAIMON_SCAN_SNAPSHOT_ID,
params.get(DORIS_START_SNAPSHOT_ID));
+ } else if (hasStartSnapshotId && hasEndSnapshotId) {
+ // Both start and end snapshot IDs are specified
+ String startSId = params.get(DORIS_START_SNAPSHOT_ID);
+ String endSId = params.get(DORIS_END_SNAPSHOT_ID);
+ paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN, startSId +
"," + endSId);
+ }
+
+ // Add incremental between scan mode if present
+ if (hasIncrementalBetweenScanMode) {
+ paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN_SCAN_MODE,
+ params.get(DORIS_INCREMENTAL_BETWEEN_SCAN_MODE));
+ }
+ }
+
+ if (hasTimestampParams) {
+ String startTS = params.get(DORIS_START_TIMESTAMP);
+ String endTS = params.get(DORIS_END_TIMESTAMP);
+
+ if (hasStartTimestamp && !hasEndTimestamp) {
+ // Only startTimestamp is specified
+ paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN_TIMESTAMP,
startTS + "," + Long.MAX_VALUE);
+ } else if (hasStartTimestamp && hasEndTimestamp) {
+ // Both start and end timestamps are specified
+ paimonScanParams.put(PAIMON_INCREMENTAL_BETWEEN_TIMESTAMP,
startTS + "," + endTS);
+ }
+ }
+
+ return paimonScanParams;
+ }
}
+
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 963178c3882..2e712969c1e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -20,34 +20,35 @@ package org.apache.doris.datasource.paimon.source;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
-import mockit.Expectations;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.RawFile;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+@RunWith(MockitoJUnitRunner.class)
public class PaimonScanNodeTest {
- @Mocked
+ @Mock
private SessionVariable sv;
- @Mocked
+ @Mock
private PaimonFileExternalCatalog paimonFileExternalCatalog;
@Test
@@ -81,41 +82,30 @@ public class PaimonScanNodeTest {
.build();
- new MockUp<PaimonScanNode>() {
- @Mock
- public List<org.apache.paimon.table.source.Split>
getPaimonSplitFromAPI() {
- return new ArrayList<org.apache.paimon.table.source.Split>() {{
- add(ds1);
- add(ds2);
- }};
+ // Mock PaimonScanNode to return test data splits
+ PaimonScanNode spyPaimonScanNode = Mockito.spy(paimonScanNode);
+ Mockito.doReturn(new ArrayList<org.apache.paimon.table.source.Split>()
{
+ {
+ add(ds1);
+ add(ds2);
}
- };
+ }).when(spyPaimonScanNode).getPaimonSplitFromAPI();
- new MockUp<PaimonSource>() {
- @Mock
- public ExternalCatalog getCatalog() {
- return paimonFileExternalCatalog;
- }
- };
-
- new MockUp<ExternalCatalog>() {
- @Mock
- public Map<String, String> getProperties() {
- return Collections.emptyMap();
- }
- };
+ // Mock PaimonSource to return catalog
+ PaimonSource mockPaimonSource = Mockito.mock(PaimonSource.class);
+
Mockito.when(mockPaimonSource.getCatalog()).thenReturn(paimonFileExternalCatalog);
+ spyPaimonScanNode.setSource(mockPaimonSource);
- new Expectations() {{
- sv.isForceJniScanner();
- result = false;
+ // Mock ExternalCatalog properties
+
Mockito.when(paimonFileExternalCatalog.getProperties()).thenReturn(Collections.emptyMap());
- sv.getIgnoreSplitType();
- result = "NONE";
- }};
+ // Mock SessionVariable behavior
+ Mockito.when(sv.isForceJniScanner()).thenReturn(false);
+ Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE");
// native
- mockNativeReader();
- List<org.apache.doris.spi.Split> s1 = paimonScanNode.getSplits(1);
+ mockNativeReader(spyPaimonScanNode);
+ List<org.apache.doris.spi.Split> s1 = spyPaimonScanNode.getSplits(1);
PaimonSplit s11 = (PaimonSplit) s1.get(0);
PaimonSplit s12 = (PaimonSplit) s1.get(1);
Assert.assertEquals(2, s1.size());
@@ -125,8 +115,8 @@ public class PaimonScanNodeTest {
Assert.assertNull(s12.getSplit());
// jni
- mockJniReader();
- List<org.apache.doris.spi.Split> s2 = paimonScanNode.getSplits(1);
+ mockJniReader(spyPaimonScanNode);
+ List<org.apache.doris.spi.Split> s2 = spyPaimonScanNode.getSplits(1);
PaimonSplit s21 = (PaimonSplit) s2.get(0);
PaimonSplit s22 = (PaimonSplit) s2.get(1);
Assert.assertEquals(2, s2.size());
@@ -136,21 +126,265 @@ public class PaimonScanNodeTest {
Assert.assertEquals(50, s22.getSplitWeight().getRawValue());
}
- private void mockJniReader() {
- new MockUp<PaimonScanNode>() {
- @Mock
- public boolean supportNativeReader(Optional<List<RawFile>>
optRawFiles) {
- return false;
- }
- };
+ @Test
+ public void testValidateIncrementalReadParams() throws UserException {
+ // Test valid parameter combinations
+
+ // 1. Only startSnapshotId
+ Map<String, String> params = new HashMap<>();
+ params.put("startSnapshotId", "5");
+ Map<String, String> result =
PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.assertEquals("5", result.get("scan.snapshot-id"));
+ Assert.assertNull(result.get("scan.mode"));
+ Assert.assertEquals(2, result.size());
+
+ // 2. Both startSnapshotId and endSnapshotId
+ params.clear();
+ params.put("startSnapshotId", "1");
+ params.put("endSnapshotId", "5");
+ result = PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.assertEquals("1,5", result.get("incremental-between"));
+ Assert.assertTrue(result.containsKey("scan.mode") &&
result.get("scan.mode") == null);
+ Assert.assertEquals(3, result.size());
+
+ // 3. startSnapshotId + endSnapshotId + incrementalBetweenScanMode
+ params.clear();
+ params.put("startSnapshotId", "2");
+ params.put("endSnapshotId", "8");
+ params.put("incrementalBetweenScanMode", "diff");
+ result = PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.assertEquals("2,8", result.get("incremental-between"));
+ Assert.assertEquals("diff",
result.get("incremental-between-scan-mode"));
+ Assert.assertTrue(result.containsKey("scan.mode") &&
result.get("scan.mode") == null);
+ Assert.assertEquals(4, result.size());
+
+ // 4. Only startTimestamp
+ params.clear();
+ params.put("startTimestamp", "1000");
+ result = PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.assertEquals("1000," + Long.MAX_VALUE,
result.get("incremental-between-timestamp"));
+ Assert.assertTrue(result.containsKey("scan.mode") &&
result.get("scan.mode") == null);
+ Assert.assertTrue(result.containsKey("scan.snapshot-id") &&
result.get("scan.snapshot-id") == null);
+ Assert.assertEquals(3, result.size());
+
+ // 5. Both startTimestamp and endTimestamp
+ params.clear();
+ params.put("startTimestamp", "1000");
+ params.put("endTimestamp", "2000");
+ result = PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.assertEquals("1000,2000",
result.get("incremental-between-timestamp"));
+ Assert.assertTrue(result.containsKey("scan.mode") &&
result.get("scan.mode") == null);
+ Assert.assertTrue(result.containsKey("scan.snapshot-id") &&
result.get("scan.snapshot-id") == null);
+ Assert.assertEquals(3, result.size());
+
+ // Test invalid parameter combinations
+
+ // 6. Test mutual exclusivity - both snapshot and timestamp params
+ params.clear();
+ params.put("startSnapshotId", "1");
+ params.put("startTimestamp", "1000");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for mutual exclusivity");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("Cannot specify both
snapshot-based parameters"));
+ }
+
+ // 7. Test snapshot params without required startSnapshotId
+ params.clear();
+ params.put("endSnapshotId", "5");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when startSnapshotId is
missing");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startSnapshotId is
required"));
+ }
+
+ // 8. Test timestamp params without required startTimestamp
+ params.clear();
+ params.put("endTimestamp", "2000");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when startTimestamp is
missing");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startTimestamp is
required"));
+ }
+
+ // 9. Test incrementalBetweenScanMode without endSnapshotId
+ params.clear();
+ params.put("startSnapshotId", "1");
+ params.put("incrementalBetweenScanMode", "auto");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when
incrementalBetweenScanMode appears without endSnapshotId");
+ } catch (UserException e) {
+
Assert.assertTrue(e.getMessage().contains("incrementalBetweenScanMode can only
be specified when both"));
+ }
+
+ // 10. Test incrementalBetweenScanMode alone
+ params.clear();
+ params.put("incrementalBetweenScanMode", "auto");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when
incrementalBetweenScanMode appears alone");
+ } catch (UserException e) {
+ Assert.assertTrue(
+ e.getMessage().contains("startSnapshotId is required when
using snapshot-based incremental read"));
+ }
+
+ // 11. Test invalid snapshot ID values (≤ 0)
+ params.clear();
+ params.put("startSnapshotId", "0");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for startSnapshotId ≤ 0");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
greater than 0"));
+ }
+
+ params.clear();
+ params.put("startSnapshotId", "-1");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for negative startSnapshotId");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
greater than 0"));
+ }
+
+ params.clear();
+ params.put("startSnapshotId", "1");
+ params.put("endSnapshotId", "0");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for endSnapshotId ≤ 0");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("endSnapshotId must be
greater than 0"));
+ }
+
+ // 12. Test start ≥ end for snapshot IDs
+ params.clear();
+ params.put("startSnapshotId", "5");
+ params.put("endSnapshotId", "5");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when startSnapshotId =
endSnapshotId");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
less than endSnapshotId"));
+ }
+
+ params.clear();
+ params.put("startSnapshotId", "6");
+ params.put("endSnapshotId", "5");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when startSnapshotId >
endSnapshotId");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
less than endSnapshotId"));
+ }
+
+ // 13. Test invalid timestamp values (≤ 0)
+ params.clear();
+ params.put("startTimestamp", "-1");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for startTimestamp < 0");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startTimestamp must be
greater than or equal to 0"));
+ }
+
+ params.clear();
+ params.put("startTimestamp", "1000");
+ params.put("endTimestamp", "0");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for endTimestamp ≤ 0");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("endTimestamp must be
greater than 0"));
+ }
+
+ // 14. Test start ≥ end for timestamps
+ params.clear();
+ params.put("startTimestamp", "2000");
+ params.put("endTimestamp", "2000");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when startTimestamp =
endTimestamp");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startTimestamp must be
less than endTimestamp"));
+ }
+
+ params.clear();
+ params.put("startTimestamp", "3000");
+ params.put("endTimestamp", "2000");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when startTimestamp >
endTimestamp");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("startTimestamp must be
less than endTimestamp"));
+ }
+
+ // 15. Test invalid number format
+ params.clear();
+ params.put("startSnapshotId", "invalid");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for invalid number format");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("Invalid startSnapshotId
format"));
+ }
+
+ params.clear();
+ params.put("startTimestamp", "invalid");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for invalid timestamp format");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("Invalid startTimestamp
format"));
+ }
+
+ // 16. Test invalid incrementalBetweenScanMode values
+ params.clear();
+ params.put("startSnapshotId", "1");
+ params.put("endSnapshotId", "5");
+ params.put("incrementalBetweenScanMode", "invalid");
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception for invalid scan mode");
+ } catch (UserException e) {
+ Assert.assertTrue(
+ e.getMessage().contains("incrementalBetweenScanMode must
be one of: auto, diff, delta, changelog"));
+ }
+
+ // 17. Test valid incrementalBetweenScanMode values (case insensitive)
+ String[] validModes = {"auto", "AUTO", "diff", "DIFF", "delta",
"DELTA", "changelog", "CHANGELOG"};
+ for (String mode : validModes) {
+ params.clear();
+ params.put("startSnapshotId", "1");
+ params.put("endSnapshotId", "5");
+ params.put("incrementalBetweenScanMode", mode);
+ result = PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.assertEquals("1,5", result.get("incremental-between"));
+ Assert.assertEquals(mode,
result.get("incremental-between-scan-mode"));
+ Assert.assertTrue(result.containsKey("scan.mode") &&
result.get("scan.mode") == null);
+ Assert.assertTrue(result.containsKey("scan.mode") &&
result.get("scan.mode") == null);
+ Assert.assertEquals(4, result.size());
+ }
+
+ // 18. Test no parameters at all
+ params.clear();
+ try {
+ PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.fail("Should throw exception when no parameters provided");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("at least one valid
parameter group must be specified"));
+ }
}
- private void mockNativeReader() {
- new MockUp<PaimonScanNode>() {
- @Mock
- public boolean supportNativeReader(Optional<List<RawFile>>
optRawFiles) {
- return true;
- }
- };
+ private void mockJniReader(PaimonScanNode spyNode) {
+
Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
+ }
+
+ private void mockNativeReader(PaimonScanNode spyNode) {
+
Mockito.doReturn(true).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
}
}
diff --git a/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
new file mode 100644
index 00000000000..6472ab41b64
Binary files /dev/null and
b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out differ
diff --git
a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
new file mode 100644
index 00000000000..cfe9fbf8960
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_incr_read",
"p0,external,doris,external_docker,external_docker_doris") {
+ logger.info("start paimon test")
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable paimon test.")
+ return
+ }
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String catalog_name = "test_paimon_incr_read_catalog"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ try {
+ sql """drop catalog if exists ${catalog_name}"""
+
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type' = 'paimon',
+ 'warehouse' = 's3://warehouse/wh',
+ 's3.endpoint' =
'http://${externalEnvIp}:${minio_port}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.path.style.access' = 'true'
+ );
+ """
+ sql """switch `${catalog_name}`"""
+ sql """use test_paimon_incr_read_db"""
+
+ def test_incr_read = { String force ->
+ sql """ set force_jni_scanner=${force} """
+ order_qt_snapshot_incr1 """select * from
paimon_incr@incr('startSnapshotId'=1)"""
+ order_qt_snapshot_incr2 """select * from
paimon_incr@incr('startSnapshotId'=2)"""
+ order_qt_snapshot_incr3 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2)"""
+ order_qt_snapshot_incr4 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=3)"""
+ order_qt_snapshot_incr5 """select * from
paimon_incr@incr('startSnapshotId'=2, 'endSnapshotId'=3)"""
+ order_qt_timestamp_incr1 """select * from
paimon_incr@incr('startTimestamp'=0)"""
+ order_qt_timestamp_incr2 """select * from
paimon_incr@incr('startTimestamp'=0, 'endTimestamp' = 1)"""
+ order_qt_timestamp_incr3 """select * from
paimon_incr@incr('startTimestamp'=0, 'endtimestamp' = 999999999999999)"""
+
+ order_qt_scan_mode1 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2,
'incrementalBetweenScanMode' = 'auto');"""
+ order_qt_scan_mode2 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2,
'incrementalBetweenScanMode' = 'diff');"""
+ order_qt_scan_mode3 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2,
'incrementalBetweenScanMode' = 'delta');"""
+ order_qt_scan_mode4 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2,
'incrementalBetweenScanMode' = 'changelog');"""
+
+
+ // complex query
+ qt_cte """with cte1 as (select * from
paimon_incr@incr('startTimestamp'=0)) select name, age from cte1 order by
age;"""
+ qt_join """select * from paimon_incr@incr('startSnapshotId'=1,
'endSnapshotId'=2) t1 join paimon_incr@incr('startTimestamp'=0) t2 on t1.id =
t2.id order by id;"""
+
+ test {
+ sql """select * from paimon_incr@incr('startTimestamp'=-1);"""
+ exception "startTimestamp must be greater than or equal to 0"
+ }
+ test {
+ sql """select * from paimon_incr@incr('startTimestam'=-1)"""
+ exception "at least one valid parameter group must be
specified"
+ }
+ test {
+ sql """select * from
paimon_incr@incr('endTimestamp'=999999999999999)"""
+ exception "startTimestamp is required when using
timestamp-based incremental read"
+ }
+ test {
+ sql """select * from paimon_incr@incr()"""
+ exception "at least one valid parameter group must be
specified"
+ }
+ test {
+ sql """select * from
paimon_incr@incr('incrementalBetweenScanMode' = 'auto');"""
+ exception "startSnapshotId is required when using
snapshot-based incremental read"
+ }
+ test {
+ sql """select * from paimon_incr@incr('startSnapshotId'=1,
'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'error');"""
+ exception "incrementalBetweenScanMode must be one of"
+ }
+ test {
+ sql """select * from paimon_incr@incr('startSnapshotId'=1,
'endSnapshotId'=1)"""
+ exception "startSnapshotId must be less than endSnapshotId"
+ }
+ }
+
+ test_incr_read("false")
+ test_incr_read("true")
+ } finally {
+ // sql """drop catalog if exists ${catalog_name}"""
+ }
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]