This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e129dd5e5 [core] Add row range index to btree index build (#7510)
6e129dd5e5 is described below

commit 6e129dd5e51836c85b04bfa8d039e8d5b48d68df
Author: YeJunHao <[email protected]>
AuthorDate: Tue Mar 24 15:10:20 2026 +0800

    [core] Add row range index to btree index build (#7510)
---
 .../paimon/globalindex/DataEvolutionBatchScan.java |  2 +-
 .../globalindex/btree/BTreeGlobalIndexBuilder.java | 97 ++++++++++++++++------
 .../org/apache/paimon/table/KnownSplitsTable.java  | 10 +--
 .../test/java/org/apache/paimon/JavaPyE2ETest.java | 30 ++++++-
 .../btree/BTreeGlobalIndexBuilderTest.java         | 35 ++++++--
 .../paimon/table/BtreeGlobalIndexTableTest.java    |  9 +-
 .../paimon/flink/btree/BTreeIndexTopoBuilder.java  | 38 +++++----
 .../globalindex/btree/BTreeIndexTopoBuilder.java   | 25 ++++--
 .../apache/paimon/spark/read/PaimonSplitScan.scala |  4 +-
 .../apache/paimon/spark/util/ScanPlanHelper.scala  | 10 +--
 10 files changed, 187 insertions(+), 73 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index f2b2c70b1b..84f40243f2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -299,7 +299,7 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
     }
 
     @VisibleForTesting
-    static Plan wrapToIndexSplits(
+    public static Plan wrapToIndexSplits(
             List<Split> splits, RowRangeIndex rowRangeIndex, ScoreGetter 
scoreGetter) {
         List<Split> indexedSplits = new ArrayList<>();
         Function<Split, List<IndexedSplit>> process =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index c60ec2bc27..cfeaa99918 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -25,8 +25,10 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.globalindex.DataEvolutionBatchScan;
 import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
 import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.IndexedSplit;
 import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
@@ -53,12 +55,14 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RangeHelper;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -66,6 +70,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
@@ -90,6 +95,7 @@ public class BTreeGlobalIndexBuilder implements Serializable {
 
     // readRowType is composed by partition fields, indexed field and _ROW_ID 
field
     private RowType readRowType;
+    @Nullable private Snapshot snapshot;
 
     @Nullable private PartitionPredicate partitionPredicate;
 
@@ -121,36 +127,58 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
         return this;
     }
 
-    public List<DataSplit> scan() {
+    public BTreeGlobalIndexBuilder withSnapshot(Snapshot snapshot) {
+        this.snapshot = snapshot;
+        return this;
+    }
+
+    public Optional<Pair<RowRangeIndex, List<DataSplit>>> scan() {
         SnapshotReader snapshotReader = table.newSnapshotReader();
         if (partitionPredicate != null) {
             snapshotReader = 
snapshotReader.withPartitionFilter(partitionPredicate);
         }
+        Snapshot snapshot =
+                this.snapshot != null
+                        ? this.snapshot
+                        : snapshotReader.snapshotManager().latestSnapshot();
+        if (snapshot == null) {
+            return Optional.empty();
+        }
+        snapshotReader = snapshotReader.withSnapshot(snapshot);
+        Range dataRange = new Range(0, snapshot.nextRowId() - 1);
 
-        return snapshotReader.read().dataSplits();
+        return Optional.of(
+                Pair.of(
+                        
RowRangeIndex.create(Collections.singletonList(dataRange)),
+                        snapshotReader.read().dataSplits()));
     }
 
-    public List<DataSplit> incrementalScan() {
+    public Optional<Pair<RowRangeIndex, List<DataSplit>>> incrementalScan() {
         SnapshotReader snapshotReader = table.newSnapshotReader();
         if (partitionPredicate != null) {
             snapshotReader = 
snapshotReader.withPartitionFilter(partitionPredicate);
         }
-        Snapshot latestSnapshot = 
snapshotReader.snapshotManager().latestSnapshot();
-        if (latestSnapshot == null) {
-            return Collections.emptyList();
+        Snapshot snapshot =
+                this.snapshot != null
+                        ? this.snapshot
+                        : snapshotReader.snapshotManager().latestSnapshot();
+        if (snapshot == null) {
+            return Optional.empty();
         }
-        snapshotReader = snapshotReader.withSnapshot(latestSnapshot);
+        snapshotReader = snapshotReader.withSnapshot(snapshot);
 
         Preconditions.checkArgument(indexField != null, "indexField must be 
set before scan.");
-
-        Range dataRange = new Range(0, latestSnapshot.nextRowId() - 1);
-        List<Range> indexedRanges = indexedRowRanges(latestSnapshot);
+        Range dataRange = new Range(0, snapshot.nextRowId() - 1);
+        List<Range> indexedRanges = indexedRowRanges(snapshot);
         List<Range> nonIndexedRanges = dataRange.exclude(indexedRanges);
         if (nonIndexedRanges.isEmpty()) {
-            return Collections.emptyList();
+            return Optional.empty();
         }
         snapshotReader = snapshotReader.withRowRanges(nonIndexedRanges);
-        return snapshotReader.read().dataSplits();
+        return Optional.of(
+                Pair.of(
+                        RowRangeIndex.create(nonIndexedRanges),
+                        snapshotReader.read().dataSplits()));
     }
 
     private List<Range> indexedRowRanges(Snapshot snapshot) {
@@ -278,6 +306,25 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                 partition, 0, null, dataIncrement, 
CompactIncrement.emptyIncrement());
     }
 
+    public static Pair<Range, Split> calcRowRangeWithRowIndex(
+            RowRangeIndex rowRangeIndex, DataSplit dataSplit) {
+        if (rowRangeIndex != null) {
+            IndexedSplit indexedSplit =
+                    (IndexedSplit)
+                            DataEvolutionBatchScan.wrapToIndexSplits(
+                                            Arrays.asList(dataSplit), 
rowRangeIndex, null)
+                                    .splits()
+                                    .get(0);
+            checkArgument(
+                    indexedSplit.rowRanges().size() == 1,
+                    "Expected exactly one row range for the split, but found: 
%s",
+                    indexedSplit.rowRanges());
+            return Pair.of(indexedSplit.rowRanges().get(0), indexedSplit);
+        }
+
+        return Pair.of(calcRowRange(dataSplit), dataSplit);
+    }
+
     public static Range calcRowRange(DataSplit dataSplit) {
         List<Range> ranges = calcRowRanges(singletonList(dataSplit));
         if (ranges.isEmpty()) {
@@ -304,32 +351,34 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
         return result;
     }
 
-    public static Map<BinaryRow, Map<Range, List<DataSplit>>> 
groupSplitsByRange(
-            List<DataSplit> splits) {
-        Map<BinaryRow, List<Pair<Range, DataSplit>>> partitionSplitRanges = 
new HashMap<>();
+    public static Map<BinaryRow, Map<Range, List<Split>>> groupSplitsByRange(
+            RowRangeIndex rowRangeIndex, List<DataSplit> splits) {
+        Map<BinaryRow, List<Pair<Range, Split>>> partitionSplitRanges = new 
HashMap<>();
         for (DataSplit split : splits) {
-            Range splitRange = calcRowRange(split);
+            Pair<Range, Split> keyPair = 
calcRowRangeWithRowIndex(rowRangeIndex, split);
+            Range splitRange = keyPair.getKey();
+            Split splitWithRange = keyPair.getValue();
             if (splitRange == null) {
                 continue;
             }
             BinaryRow partition = split.partition();
             partitionSplitRanges
                     .computeIfAbsent(partition, p -> new ArrayList<>())
-                    .add(Pair.of(splitRange, split));
+                    .add(Pair.of(splitRange, splitWithRange));
         }
 
-        Map<BinaryRow, Map<Range, List<DataSplit>>> result = new HashMap<>();
-        for (Map.Entry<BinaryRow, List<Pair<Range, DataSplit>>> partitionEntry 
:
+        Map<BinaryRow, Map<Range, List<Split>>> result = new HashMap<>();
+        for (Map.Entry<BinaryRow, List<Pair<Range, Split>>> partitionEntry :
                 partitionSplitRanges.entrySet()) {
-            List<Pair<Range, DataSplit>> splitRanges = 
partitionEntry.getValue();
+            List<Pair<Range, Split>> splitRanges = partitionEntry.getValue();
             splitRanges.sort(
-                    Comparator.comparingLong((Pair<Range, DataSplit> e) -> 
e.getKey().from)
+                    Comparator.comparingLong((Pair<Range, Split> e) -> 
e.getKey().from)
                             .thenComparingLong(e -> e.getKey().to));
 
-            Map<Range, List<DataSplit>> partitionRanges = new 
LinkedHashMap<>();
+            Map<Range, List<Split>> partitionRanges = new LinkedHashMap<>();
             Range current = null;
-            List<DataSplit> currentSplits = new ArrayList<>();
-            for (Map.Entry<Range, DataSplit> entry : splitRanges) {
+            List<Split> currentSplits = new ArrayList<>();
+            for (Map.Entry<Range, Split> entry : splitRanges) {
                 Range splitRange = entry.getKey();
                 if (current == null) {
                     current = splitRange;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java
index 66947261c5..d76a6d700c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java
@@ -19,9 +19,9 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.RowType;
 
 import java.util.List;
@@ -33,18 +33,18 @@ import java.util.Map;
 public class KnownSplitsTable implements ReadonlyTable {
 
     private final InnerTable origin;
-    private final DataSplit[] splits;
+    private final Split[] splits;
 
-    KnownSplitsTable(InnerTable origin, DataSplit[] splits) {
+    KnownSplitsTable(InnerTable origin, Split[] splits) {
         this.origin = origin;
         this.splits = splits;
     }
 
-    public static KnownSplitsTable create(InnerTable origin, DataSplit[] 
splits) {
+    public static KnownSplitsTable create(InnerTable origin, Split[] splits) {
         return new KnownSplitsTable(origin, splits);
     }
 
-    public DataSplit[] splits() {
+    public Split[] splits() {
         return splits;
     }
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 22283fdd11..fad9507067 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -532,7 +532,15 @@ public class JavaPyE2ETest {
         BTreeGlobalIndexBuilder builder = new 
BTreeGlobalIndexBuilder(table).withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
             commit.commit(
-                    builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
+                    builder.build(
+                            builder.scan()
+                                    
.map(org.apache.paimon.utils.Pair::getValue)
+                                    .orElseThrow(
+                                            () ->
+                                                    new IllegalStateException(
+                                                            "Expected scan 
result when building index."))
+                                    .get(0),
+                            IOManager.create(warehouse.toString())));
         }
 
         // assert index
@@ -600,7 +608,15 @@ public class JavaPyE2ETest {
         BTreeGlobalIndexBuilder builder = new 
BTreeGlobalIndexBuilder(table).withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
             commit.commit(
-                    builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
+                    builder.build(
+                            builder.scan()
+                                    
.map(org.apache.paimon.utils.Pair::getValue)
+                                    .orElseThrow(
+                                            () ->
+                                                    new IllegalStateException(
+                                                            "Expected scan 
result when building index."))
+                                    .get(0),
+                            IOManager.create(warehouse.toString())));
         }
 
         // assert index
@@ -670,7 +686,15 @@ public class JavaPyE2ETest {
         BTreeGlobalIndexBuilder builder = new 
BTreeGlobalIndexBuilder(table).withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
             commit.commit(
-                    builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
+                    builder.build(
+                            builder.scan()
+                                    
.map(org.apache.paimon.utils.Pair::getValue)
+                                    .orElseThrow(
+                                            () ->
+                                                    new IllegalStateException(
+                                                            "Expected scan 
result when building index."))
+                                    .get(0),
+                            IOManager.create(warehouse.toString())));
         }
 
         // assert index
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
index 97a47bcce2..1010b8fc79 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -51,6 +51,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /** Test class for {@link BTreeGlobalIndexBuilder}. */
 public class BTreeGlobalIndexBuilderTest extends TableTestBase {
@@ -107,7 +108,13 @@ public class BTreeGlobalIndexBuilderTest extends 
TableTestBase {
         BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
         builder.withIndexField("f0");
         builder.withPartitionPredicate(partitionPredicate);
-        List<DataSplit> dataSplits = builder.scan();
+        List<DataSplit> dataSplits =
+                builder.scan()
+                        .map(Pair::getRight)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Expected scan result when 
building index."));
         List<CommitMessage> commitMessages = new ArrayList<>();
         for (DataSplit dataSplit : dataSplits) {
             commitMessages.addAll(builder.build(dataSplit, ioManager));
@@ -160,9 +167,9 @@ public class BTreeGlobalIndexBuilderTest extends 
TableTestBase {
         BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
         builder.withIndexField("f0");
 
-        List<DataSplit> splits = builder.incrementalScan();
-        Assertions.assertTrue(
-                splits.isEmpty(), "incrementalScan on empty table should 
return empty");
+        Assertions.assertFalse(
+                builder.incrementalScan().isPresent(),
+                "incrementalScan on empty table should return empty");
     }
 
     @Test
@@ -174,9 +181,8 @@ public class BTreeGlobalIndexBuilderTest extends 
TableTestBase {
         BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
         builder.withIndexField("f0");
 
-        List<DataSplit> splits = builder.incrementalScan();
-        Assertions.assertTrue(
-                splits.isEmpty(),
+        Assertions.assertFalse(
+                builder.incrementalScan().isPresent(),
                 "incrementalScan should return empty when all data is already 
indexed");
     }
 
@@ -205,7 +211,12 @@ public class BTreeGlobalIndexBuilderTest extends 
TableTestBase {
         BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
         builder.withIndexField("f0");
 
-        List<DataSplit> splits = builder.incrementalScan();
+        Optional<Pair<org.apache.paimon.utils.RowRangeIndex, List<DataSplit>>> 
incrementalScan =
+                builder.incrementalScan();
+        Assertions.assertTrue(
+                incrementalScan.isPresent(),
+                "incrementalScan should return non-empty splits for newly 
written data");
+        List<DataSplit> splits = incrementalScan.get().getRight();
         Assertions.assertFalse(
                 splits.isEmpty(),
                 "incrementalScan should return non-empty splits for newly 
written data");
@@ -259,7 +270,13 @@ public class BTreeGlobalIndexBuilderTest extends 
TableTestBase {
         builder.withIndexField("f0");
         
builder.withPartitionPredicate(PartitionPredicate.fromPredicate(partType, 
predicate));
 
-        List<DataSplit> splits = builder.incrementalScan();
+        List<DataSplit> splits =
+                builder.incrementalScan()
+                        .map(Pair::getRight)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Expected incremental scan 
result for new data."));
         Assertions.assertFalse(splits.isEmpty());
 
         for (DataSplit split : splits) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index f48fcfd0b1..f524e07e73 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -218,8 +218,15 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
         BTreeGlobalIndexBuilder builder =
                 new BTreeGlobalIndexBuilder(table).withIndexField(fieldName);
+        List<DataSplit> dataSplits =
+                builder.scan()
+                        .map(org.apache.paimon.utils.Pair::getRight)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Expected scan result when 
building index."));
         List<CommitMessage> commitMessages = new ArrayList<>();
-        for (DataSplit dataSplit : indexSplits(table, rowRanges, 
builder.scan())) {
+        for (DataSplit dataSplit : indexSplits(table, rowRanges, dataSplits)) {
             commitMessages.addAll(builder.build(dataSplit, ioManager));
         }
         try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index c98f7452f7..35830faac1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -46,10 +46,13 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -63,6 +66,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.function.Supplier;
 
@@ -88,12 +92,19 @@ public class BTreeIndexTopoBuilder {
                 indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
             }
 
-            List<DataSplit> splits = 
splitByContiguousRowRange(indexBuilder.scan());
+            Optional<Pair<RowRangeIndex, List<DataSplit>>> indexRangeAndSplits 
=
+                    indexBuilder.scan();
+            if (!indexRangeAndSplits.isPresent()) {
+                return false;
+            }
+
+            Pair<RowRangeIndex, List<DataSplit>> scanResult = 
indexRangeAndSplits.get();
+            List<DataSplit> splits = 
splitByContiguousRowRange(scanResult.getRight());
             if (splits.isEmpty()) {
                 return false;
             }
-            Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
-                    groupSplitsByRange(splits);
+            Map<BinaryRow, Map<Range, List<Split>>> partitionRangeSplits =
+                    groupSplitsByRange(scanResult.getLeft(), splits);
             if (partitionRangeSplits.isEmpty()) {
                 return false;
             }
@@ -120,13 +131,12 @@ public class BTreeIndexTopoBuilder {
             sortColumns.add(indexColumn);
             int partitionFieldSize = table.partitionKeys().size();
             BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionFieldSize);
-            for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> 
partitionEntry :
+            for (Map.Entry<BinaryRow, Map<Range, List<Split>>> partitionEntry :
                     partitionRangeSplits.entrySet()) {
                 BinaryRow partition = partitionEntry.getKey();
-                for (Map.Entry<Range, List<DataSplit>> entry :
-                        partitionEntry.getValue().entrySet()) {
+                for (Map.Entry<Range, List<Split>> entry : 
partitionEntry.getValue().entrySet()) {
                     Range range = entry.getKey();
-                    List<DataSplit> rangeSplits = entry.getValue();
+                    List<Split> rangeSplits = entry.getValue();
                     if (rangeSplits.isEmpty()) {
                         continue;
                     }
@@ -184,7 +194,7 @@ public class BTreeIndexTopoBuilder {
     protected static DataStream<Committable> executeForPartitionRange(
             StreamExecutionEnvironment env,
             Range range,
-            List<DataSplit> rangeSplits,
+            List<Split> rangeSplits,
             ReadBuilder readBuilder,
             BTreeGlobalIndexBuilder indexBuilder,
             int partitionFieldSize,
@@ -200,10 +210,8 @@ public class BTreeIndexTopoBuilder {
         int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
         parallelism = Math.min(parallelism, maxParallelism);
 
-        DataStream<DataSplit> sourceStream =
-                env.fromData(
-                                new JavaTypeInfo<>(DataSplit.class),
-                                rangeSplits.toArray(new DataSplit[0]))
+        DataStream<Split> sourceStream =
+                env.fromData(new JavaTypeInfo<>(Split.class), 
rangeSplits.toArray(new Split[0]))
                         .name("Global Index Source " + " range=" + range)
                         .setParallelism(1);
 
@@ -264,7 +272,7 @@ public class BTreeIndexTopoBuilder {
     private static class ReadDataOperator
             extends 
org.apache.flink.table.runtime.operators.TableStreamOperator<RowData>
             implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator<
-                    DataSplit, RowData> {
+                    Split, RowData> {
 
         private static final long serialVersionUID = 1L;
 
@@ -283,8 +291,8 @@ public class BTreeIndexTopoBuilder {
         }
 
         @Override
-        public void processElement(StreamRecord<DataSplit> element) throws 
Exception {
-            DataSplit split = element.getValue();
+        public void processElement(StreamRecord<Split> element) throws 
Exception {
+            Split split = element.getValue();
             try (RecordReader<InternalRow> reader = 
tableRead.createReader(split)) {
                 reader.forEachRemaining(
                         row -> output.collect(new StreamRecord<>(new 
FlinkRowData(row))));
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index cf9402d0d5..5f7bfa453e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -32,10 +32,13 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -53,6 +56,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
 import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
@@ -83,12 +87,19 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
             indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
         }
 
-        List<DataSplit> splits = 
splitByContiguousRowRange(indexBuilder.scan());
+        Optional<Pair<RowRangeIndex, List<DataSplit>>> indexRangeAndSplits = 
indexBuilder.scan();
+        if (!indexRangeAndSplits.isPresent()) {
+            return Collections.emptyList();
+        }
+
+        Pair<RowRangeIndex, List<DataSplit>> scanResult = 
indexRangeAndSplits.get();
+        List<DataSplit> splits = 
splitByContiguousRowRange(scanResult.getRight());
         if (splits.isEmpty()) {
             return Collections.emptyList();
         }
-        Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
-                groupSplitsByRange(splits);
+
+        Map<BinaryRow, Map<Range, List<Split>>> partitionRangeSplits =
+                groupSplitsByRange(scanResult.getKey(), splits);
         if (partitionRangeSplits.isEmpty()) {
             return Collections.emptyList();
         }
@@ -104,11 +115,11 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
         sortColumns.add(indexField.name());
         final int partitionKeyNum = table.partitionKeys().size();
         BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionKeyNum);
-        for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
+        for (Map.Entry<BinaryRow, Map<Range, List<Split>>> partitionEntry :
                 partitionRangeSplits.entrySet()) {
-            for (Map.Entry<Range, List<DataSplit>> entry : 
partitionEntry.getValue().entrySet()) {
+            for (Map.Entry<Range, List<Split>> entry : 
partitionEntry.getValue().entrySet()) {
                 Range range = entry.getKey();
-                List<DataSplit> rangeSplits = entry.getValue();
+                List<Split> rangeSplits = entry.getValue();
                 if (rangeSplits.isEmpty()) {
                     continue;
                 }
@@ -119,7 +130,7 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
                         PaimonUtils.createDataset(
                                 spark,
                                 ScanPlanHelper$.MODULE$.createNewScanPlan(
-                                        rangeSplits.toArray(new DataSplit[0]), 
relation));
+                                        rangeSplits.toArray(new Split[0]), 
relation));
 
                 Dataset<Row> selected =
                         source.select(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
index 887a330539..6d514443db 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
@@ -42,11 +42,11 @@ class PaimonSplitScanBuilder(val table: KnownSplitsTable) 
extends PaimonBaseScan
 /** For internal use only. */
 case class PaimonSplitScan(
     table: InnerTable,
-    dataSplits: Array[DataSplit],
+    dataSplits: Array[Split],
     requiredSchema: StructType,
     pushedPartitionFilters: Seq[PartitionPredicate],
     pushedDataFilters: Seq[Predicate])
   extends BaseScan {
 
-  protected def getInputSplits: Array[Split] = 
dataSplits.asInstanceOf[Array[Split]]
+  protected def getInputSplits: Array[Split] = dataSplits
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
index 1b466987e3..832291e379 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.util
 import org.apache.paimon.spark.SparkTable
 import 
org.apache.paimon.spark.schema.PaimonMetadataColumn.{PATH_AND_INDEX_META_COLUMNS,
 ROW_TRACKING_META_COLUMNS}
 import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
-import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.source.Split
 
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -35,7 +35,7 @@ trait ScanPlanHelper extends SQLConfHelper {
 
   /** Create a new scan plan from a relation with the given data splits, 
condition(optional). */
   def createNewScanPlan(
-      dataSplits: Seq[DataSplit],
+      dataSplits: Seq[Split],
       relation: DataSourceV2Relation,
       condition: Option[Expression]): LogicalPlan = {
     val newRelation = createNewScanPlan(dataSplits, relation)
@@ -46,7 +46,7 @@ trait ScanPlanHelper extends SQLConfHelper {
   }
 
   def createNewScanPlan(
-      dataSplits: Seq[DataSplit],
+      dataSplits: Seq[Split],
       relation: DataSourceV2Relation): DataSourceV2Relation = {
     relation.table match {
       case sparkTable @ SparkTable(table: InnerTable) =>
@@ -73,9 +73,7 @@ trait ScanPlanHelper extends SQLConfHelper {
 
 /** This wrapper is only used in java code, e.g. Procedure. */
 object ScanPlanHelper extends ScanPlanHelper {
-  def createNewScanPlan(
-      dataSplits: Array[DataSplit],
-      relation: DataSourceV2Relation): LogicalPlan = {
+  def createNewScanPlan(dataSplits: Array[Split], relation: 
DataSourceV2Relation): LogicalPlan = {
     ScanPlanHelper.createNewScanPlan(dataSplits.toSeq, relation)
   }
 }

Reply via email to