This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6e129dd5e5 [core] Add row range index to btree index build (#7510)
6e129dd5e5 is described below
commit 6e129dd5e51836c85b04bfa8d039e8d5b48d68df
Author: YeJunHao <[email protected]>
AuthorDate: Tue Mar 24 15:10:20 2026 +0800
[core] Add row range index to btree index build (#7510)
---
.../paimon/globalindex/DataEvolutionBatchScan.java | 2 +-
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 97 ++++++++++++++++------
.../org/apache/paimon/table/KnownSplitsTable.java | 10 +--
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 30 ++++++-
.../btree/BTreeGlobalIndexBuilderTest.java | 35 ++++++--
.../paimon/table/BtreeGlobalIndexTableTest.java | 9 +-
.../paimon/flink/btree/BTreeIndexTopoBuilder.java | 38 +++++----
.../globalindex/btree/BTreeIndexTopoBuilder.java | 25 ++++--
.../apache/paimon/spark/read/PaimonSplitScan.scala | 4 +-
.../apache/paimon/spark/util/ScanPlanHelper.scala | 10 +--
10 files changed, 187 insertions(+), 73 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index f2b2c70b1b..84f40243f2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -299,7 +299,7 @@ public class DataEvolutionBatchScan implements
DataTableScan {
}
@VisibleForTesting
- static Plan wrapToIndexSplits(
+ public static Plan wrapToIndexSplits(
List<Split> splits, RowRangeIndex rowRangeIndex, ScoreGetter
scoreGetter) {
List<Split> indexedSplits = new ArrayList<>();
Function<Split, List<IndexedSplit>> process =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index c60ec2bc27..cfeaa99918 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -25,8 +25,10 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.globalindex.DataEvolutionBatchScan;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
@@ -53,12 +55,14 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
+import org.apache.paimon.utils.RowRangeIndex;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -66,6 +70,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.IntStream;
@@ -90,6 +95,7 @@ public class BTreeGlobalIndexBuilder implements Serializable {
// readRowType is composed by partition fields, indexed field and _ROW_ID
field
private RowType readRowType;
+ @Nullable private Snapshot snapshot;
@Nullable private PartitionPredicate partitionPredicate;
@@ -121,36 +127,58 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
return this;
}
- public List<DataSplit> scan() {
+ public BTreeGlobalIndexBuilder withSnapshot(Snapshot snapshot) {
+ this.snapshot = snapshot;
+ return this;
+ }
+
+ public Optional<Pair<RowRangeIndex, List<DataSplit>>> scan() {
SnapshotReader snapshotReader = table.newSnapshotReader();
if (partitionPredicate != null) {
snapshotReader =
snapshotReader.withPartitionFilter(partitionPredicate);
}
+ Snapshot snapshot =
+ this.snapshot != null
+ ? this.snapshot
+ : snapshotReader.snapshotManager().latestSnapshot();
+ if (snapshot == null) {
+ return Optional.empty();
+ }
+ snapshotReader = snapshotReader.withSnapshot(snapshot);
+ Range dataRange = new Range(0, snapshot.nextRowId() - 1);
- return snapshotReader.read().dataSplits();
+ return Optional.of(
+ Pair.of(
+
RowRangeIndex.create(Collections.singletonList(dataRange)),
+ snapshotReader.read().dataSplits()));
}
- public List<DataSplit> incrementalScan() {
+ public Optional<Pair<RowRangeIndex, List<DataSplit>>> incrementalScan() {
SnapshotReader snapshotReader = table.newSnapshotReader();
if (partitionPredicate != null) {
snapshotReader =
snapshotReader.withPartitionFilter(partitionPredicate);
}
- Snapshot latestSnapshot =
snapshotReader.snapshotManager().latestSnapshot();
- if (latestSnapshot == null) {
- return Collections.emptyList();
+ Snapshot snapshot =
+ this.snapshot != null
+ ? this.snapshot
+ : snapshotReader.snapshotManager().latestSnapshot();
+ if (snapshot == null) {
+ return Optional.empty();
}
- snapshotReader = snapshotReader.withSnapshot(latestSnapshot);
+ snapshotReader = snapshotReader.withSnapshot(snapshot);
Preconditions.checkArgument(indexField != null, "indexField must be
set before scan.");
-
- Range dataRange = new Range(0, latestSnapshot.nextRowId() - 1);
- List<Range> indexedRanges = indexedRowRanges(latestSnapshot);
+ Range dataRange = new Range(0, snapshot.nextRowId() - 1);
+ List<Range> indexedRanges = indexedRowRanges(snapshot);
List<Range> nonIndexedRanges = dataRange.exclude(indexedRanges);
if (nonIndexedRanges.isEmpty()) {
- return Collections.emptyList();
+ return Optional.empty();
}
snapshotReader = snapshotReader.withRowRanges(nonIndexedRanges);
- return snapshotReader.read().dataSplits();
+ return Optional.of(
+ Pair.of(
+ RowRangeIndex.create(nonIndexedRanges),
+ snapshotReader.read().dataSplits()));
}
private List<Range> indexedRowRanges(Snapshot snapshot) {
@@ -278,6 +306,25 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
partition, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
}
+ public static Pair<Range, Split> calcRowRangeWithRowIndex(
+ RowRangeIndex rowRangeIndex, DataSplit dataSplit) {
+ if (rowRangeIndex != null) {
+ IndexedSplit indexedSplit =
+ (IndexedSplit)
+ DataEvolutionBatchScan.wrapToIndexSplits(
+ Arrays.asList(dataSplit),
rowRangeIndex, null)
+ .splits()
+ .get(0);
+ checkArgument(
+ indexedSplit.rowRanges().size() == 1,
+ "Expected exactly one row range for the split, but found:
%s",
+ indexedSplit.rowRanges());
+ return Pair.of(indexedSplit.rowRanges().get(0), indexedSplit);
+ }
+
+ return Pair.of(calcRowRange(dataSplit), dataSplit);
+ }
+
public static Range calcRowRange(DataSplit dataSplit) {
List<Range> ranges = calcRowRanges(singletonList(dataSplit));
if (ranges.isEmpty()) {
@@ -304,32 +351,34 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
return result;
}
- public static Map<BinaryRow, Map<Range, List<DataSplit>>>
groupSplitsByRange(
- List<DataSplit> splits) {
- Map<BinaryRow, List<Pair<Range, DataSplit>>> partitionSplitRanges =
new HashMap<>();
+ public static Map<BinaryRow, Map<Range, List<Split>>> groupSplitsByRange(
+ RowRangeIndex rowRangeIndex, List<DataSplit> splits) {
+ Map<BinaryRow, List<Pair<Range, Split>>> partitionSplitRanges = new
HashMap<>();
for (DataSplit split : splits) {
- Range splitRange = calcRowRange(split);
+ Pair<Range, Split> keyPair =
calcRowRangeWithRowIndex(rowRangeIndex, split);
+ Range splitRange = keyPair.getKey();
+ Split splitWithRange = keyPair.getValue();
if (splitRange == null) {
continue;
}
BinaryRow partition = split.partition();
partitionSplitRanges
.computeIfAbsent(partition, p -> new ArrayList<>())
- .add(Pair.of(splitRange, split));
+ .add(Pair.of(splitRange, splitWithRange));
}
- Map<BinaryRow, Map<Range, List<DataSplit>>> result = new HashMap<>();
- for (Map.Entry<BinaryRow, List<Pair<Range, DataSplit>>> partitionEntry
:
+ Map<BinaryRow, Map<Range, List<Split>>> result = new HashMap<>();
+ for (Map.Entry<BinaryRow, List<Pair<Range, Split>>> partitionEntry :
partitionSplitRanges.entrySet()) {
- List<Pair<Range, DataSplit>> splitRanges =
partitionEntry.getValue();
+ List<Pair<Range, Split>> splitRanges = partitionEntry.getValue();
splitRanges.sort(
- Comparator.comparingLong((Pair<Range, DataSplit> e) ->
e.getKey().from)
+ Comparator.comparingLong((Pair<Range, Split> e) ->
e.getKey().from)
.thenComparingLong(e -> e.getKey().to));
- Map<Range, List<DataSplit>> partitionRanges = new
LinkedHashMap<>();
+ Map<Range, List<Split>> partitionRanges = new LinkedHashMap<>();
Range current = null;
- List<DataSplit> currentSplits = new ArrayList<>();
- for (Map.Entry<Range, DataSplit> entry : splitRanges) {
+ List<Split> currentSplits = new ArrayList<>();
+ for (Map.Entry<Range, Split> entry : splitRanges) {
Range splitRange = entry.getKey();
if (current == null) {
current = splitRange;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java
index 66947261c5..d76a6d700c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/KnownSplitsTable.java
@@ -19,9 +19,9 @@
package org.apache.paimon.table;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
import java.util.List;
@@ -33,18 +33,18 @@ import java.util.Map;
public class KnownSplitsTable implements ReadonlyTable {
private final InnerTable origin;
- private final DataSplit[] splits;
+ private final Split[] splits;
- KnownSplitsTable(InnerTable origin, DataSplit[] splits) {
+ KnownSplitsTable(InnerTable origin, Split[] splits) {
this.origin = origin;
this.splits = splits;
}
- public static KnownSplitsTable create(InnerTable origin, DataSplit[]
splits) {
+ public static KnownSplitsTable create(InnerTable origin, Split[] splits) {
return new KnownSplitsTable(origin, splits);
}
- public DataSplit[] splits() {
+ public Split[] splits() {
return splits;
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 22283fdd11..fad9507067 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -532,7 +532,15 @@ public class JavaPyE2ETest {
BTreeGlobalIndexBuilder builder = new
BTreeGlobalIndexBuilder(table).withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(
- builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
+ builder.build(
+ builder.scan()
+
.map(org.apache.paimon.utils.Pair::getValue)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected scan
result when building index."))
+ .get(0),
+ IOManager.create(warehouse.toString())));
}
// assert index
@@ -600,7 +608,15 @@ public class JavaPyE2ETest {
BTreeGlobalIndexBuilder builder = new
BTreeGlobalIndexBuilder(table).withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(
- builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
+ builder.build(
+ builder.scan()
+
.map(org.apache.paimon.utils.Pair::getValue)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected scan
result when building index."))
+ .get(0),
+ IOManager.create(warehouse.toString())));
}
// assert index
@@ -670,7 +686,15 @@ public class JavaPyE2ETest {
BTreeGlobalIndexBuilder builder = new
BTreeGlobalIndexBuilder(table).withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(
- builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
+ builder.build(
+ builder.scan()
+
.map(org.apache.paimon.utils.Pair::getValue)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected scan
result when building index."))
+ .get(0),
+ IOManager.create(warehouse.toString())));
}
// assert index
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
index 97a47bcce2..1010b8fc79 100644
---
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -51,6 +51,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/** Test class for {@link BTreeGlobalIndexBuilder}. */
public class BTreeGlobalIndexBuilderTest extends TableTestBase {
@@ -107,7 +108,13 @@ public class BTreeGlobalIndexBuilderTest extends
TableTestBase {
BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
builder.withIndexField("f0");
builder.withPartitionPredicate(partitionPredicate);
- List<DataSplit> dataSplits = builder.scan();
+ List<DataSplit> dataSplits =
+ builder.scan()
+ .map(Pair::getRight)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected scan result when
building index."));
List<CommitMessage> commitMessages = new ArrayList<>();
for (DataSplit dataSplit : dataSplits) {
commitMessages.addAll(builder.build(dataSplit, ioManager));
@@ -160,9 +167,9 @@ public class BTreeGlobalIndexBuilderTest extends
TableTestBase {
BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
builder.withIndexField("f0");
- List<DataSplit> splits = builder.incrementalScan();
- Assertions.assertTrue(
- splits.isEmpty(), "incrementalScan on empty table should
return empty");
+ Assertions.assertFalse(
+ builder.incrementalScan().isPresent(),
+ "incrementalScan on empty table should return empty");
}
@Test
@@ -174,9 +181,8 @@ public class BTreeGlobalIndexBuilderTest extends
TableTestBase {
BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
builder.withIndexField("f0");
- List<DataSplit> splits = builder.incrementalScan();
- Assertions.assertTrue(
- splits.isEmpty(),
+ Assertions.assertFalse(
+ builder.incrementalScan().isPresent(),
"incrementalScan should return empty when all data is already
indexed");
}
@@ -205,7 +211,12 @@ public class BTreeGlobalIndexBuilderTest extends
TableTestBase {
BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
builder.withIndexField("f0");
- List<DataSplit> splits = builder.incrementalScan();
+ Optional<Pair<org.apache.paimon.utils.RowRangeIndex, List<DataSplit>>>
incrementalScan =
+ builder.incrementalScan();
+ Assertions.assertTrue(
+ incrementalScan.isPresent(),
+ "incrementalScan should return non-empty splits for newly
written data");
+ List<DataSplit> splits = incrementalScan.get().getRight();
Assertions.assertFalse(
splits.isEmpty(),
"incrementalScan should return non-empty splits for newly
written data");
@@ -259,7 +270,13 @@ public class BTreeGlobalIndexBuilderTest extends
TableTestBase {
builder.withIndexField("f0");
builder.withPartitionPredicate(PartitionPredicate.fromPredicate(partType,
predicate));
- List<DataSplit> splits = builder.incrementalScan();
+ List<DataSplit> splits =
+ builder.incrementalScan()
+ .map(Pair::getRight)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected incremental scan
result for new data."));
Assertions.assertFalse(splits.isEmpty());
for (DataSplit split : splits) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index f48fcfd0b1..f524e07e73 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -218,8 +218,15 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
BTreeGlobalIndexBuilder builder =
new BTreeGlobalIndexBuilder(table).withIndexField(fieldName);
+ List<DataSplit> dataSplits =
+ builder.scan()
+ .map(org.apache.paimon.utils.Pair::getRight)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected scan result when
building index."));
List<CommitMessage> commitMessages = new ArrayList<>();
- for (DataSplit dataSplit : indexSplits(table, rowRanges,
builder.scan())) {
+ for (DataSplit dataSplit : indexSplits(table, rowRanges, dataSplits)) {
commitMessages.addAll(builder.build(dataSplit, ioManager));
}
try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index c98f7452f7..35830faac1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -46,10 +46,13 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -63,6 +66,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
@@ -88,12 +92,19 @@ public class BTreeIndexTopoBuilder {
indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
}
- List<DataSplit> splits =
splitByContiguousRowRange(indexBuilder.scan());
+ Optional<Pair<RowRangeIndex, List<DataSplit>>> indexRangeAndSplits
=
+ indexBuilder.scan();
+ if (!indexRangeAndSplits.isPresent()) {
+ return false;
+ }
+
+ Pair<RowRangeIndex, List<DataSplit>> scanResult =
indexRangeAndSplits.get();
+ List<DataSplit> splits =
splitByContiguousRowRange(scanResult.getRight());
if (splits.isEmpty()) {
return false;
}
- Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
- groupSplitsByRange(splits);
+ Map<BinaryRow, Map<Range, List<Split>>> partitionRangeSplits =
+ groupSplitsByRange(scanResult.getLeft(), splits);
if (partitionRangeSplits.isEmpty()) {
return false;
}
@@ -120,13 +131,12 @@ public class BTreeIndexTopoBuilder {
sortColumns.add(indexColumn);
int partitionFieldSize = table.partitionKeys().size();
BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionFieldSize);
- for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>>
partitionEntry :
+ for (Map.Entry<BinaryRow, Map<Range, List<Split>>> partitionEntry :
partitionRangeSplits.entrySet()) {
BinaryRow partition = partitionEntry.getKey();
- for (Map.Entry<Range, List<DataSplit>> entry :
- partitionEntry.getValue().entrySet()) {
+ for (Map.Entry<Range, List<Split>> entry :
partitionEntry.getValue().entrySet()) {
Range range = entry.getKey();
- List<DataSplit> rangeSplits = entry.getValue();
+ List<Split> rangeSplits = entry.getValue();
if (rangeSplits.isEmpty()) {
continue;
}
@@ -184,7 +194,7 @@ public class BTreeIndexTopoBuilder {
protected static DataStream<Committable> executeForPartitionRange(
StreamExecutionEnvironment env,
Range range,
- List<DataSplit> rangeSplits,
+ List<Split> rangeSplits,
ReadBuilder readBuilder,
BTreeGlobalIndexBuilder indexBuilder,
int partitionFieldSize,
@@ -200,10 +210,8 @@ public class BTreeIndexTopoBuilder {
int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
parallelism = Math.min(parallelism, maxParallelism);
- DataStream<DataSplit> sourceStream =
- env.fromData(
- new JavaTypeInfo<>(DataSplit.class),
- rangeSplits.toArray(new DataSplit[0]))
+ DataStream<Split> sourceStream =
+ env.fromData(new JavaTypeInfo<>(Split.class),
rangeSplits.toArray(new Split[0]))
.name("Global Index Source " + " range=" + range)
.setParallelism(1);
@@ -264,7 +272,7 @@ public class BTreeIndexTopoBuilder {
private static class ReadDataOperator
extends
org.apache.flink.table.runtime.operators.TableStreamOperator<RowData>
implements
org.apache.flink.streaming.api.operators.OneInputStreamOperator<
- DataSplit, RowData> {
+ Split, RowData> {
private static final long serialVersionUID = 1L;
@@ -283,8 +291,8 @@ public class BTreeIndexTopoBuilder {
}
@Override
- public void processElement(StreamRecord<DataSplit> element) throws
Exception {
- DataSplit split = element.getValue();
+ public void processElement(StreamRecord<Split> element) throws
Exception {
+ Split split = element.getValue();
try (RecordReader<InternalRow> reader =
tableRead.createReader(split)) {
reader.forEachRemaining(
row -> output.collect(new StreamRecord<>(new
FlinkRowData(row))));
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index cf9402d0d5..5f7bfa453e 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -32,10 +32,13 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RowRangeIndex;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -53,6 +56,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
@@ -83,12 +87,19 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
}
- List<DataSplit> splits =
splitByContiguousRowRange(indexBuilder.scan());
+ Optional<Pair<RowRangeIndex, List<DataSplit>>> indexRangeAndSplits =
indexBuilder.scan();
+ if (!indexRangeAndSplits.isPresent()) {
+ return Collections.emptyList();
+ }
+
+ Pair<RowRangeIndex, List<DataSplit>> scanResult =
indexRangeAndSplits.get();
+ List<DataSplit> splits =
splitByContiguousRowRange(scanResult.getRight());
if (splits.isEmpty()) {
return Collections.emptyList();
}
- Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
- groupSplitsByRange(splits);
+
+ Map<BinaryRow, Map<Range, List<Split>>> partitionRangeSplits =
+ groupSplitsByRange(scanResult.getKey(), splits);
if (partitionRangeSplits.isEmpty()) {
return Collections.emptyList();
}
@@ -104,11 +115,11 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
sortColumns.add(indexField.name());
final int partitionKeyNum = table.partitionKeys().size();
BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionKeyNum);
- for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
+ for (Map.Entry<BinaryRow, Map<Range, List<Split>>> partitionEntry :
partitionRangeSplits.entrySet()) {
- for (Map.Entry<Range, List<DataSplit>> entry :
partitionEntry.getValue().entrySet()) {
+ for (Map.Entry<Range, List<Split>> entry :
partitionEntry.getValue().entrySet()) {
Range range = entry.getKey();
- List<DataSplit> rangeSplits = entry.getValue();
+ List<Split> rangeSplits = entry.getValue();
if (rangeSplits.isEmpty()) {
continue;
}
@@ -119,7 +130,7 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
PaimonUtils.createDataset(
spark,
ScanPlanHelper$.MODULE$.createNewScanPlan(
- rangeSplits.toArray(new DataSplit[0]),
relation));
+ rangeSplits.toArray(new Split[0]),
relation));
Dataset<Row> selected =
source.select(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
index 887a330539..6d514443db 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonSplitScan.scala
@@ -42,11 +42,11 @@ class PaimonSplitScanBuilder(val table: KnownSplitsTable)
extends PaimonBaseScan
/** For internal use only. */
case class PaimonSplitScan(
table: InnerTable,
- dataSplits: Array[DataSplit],
+ dataSplits: Array[Split],
requiredSchema: StructType,
pushedPartitionFilters: Seq[PartitionPredicate],
pushedDataFilters: Seq[Predicate])
extends BaseScan {
- protected def getInputSplits: Array[Split] =
dataSplits.asInstanceOf[Array[Split]]
+ protected def getInputSplits: Array[Split] = dataSplits
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
index 1b466987e3..832291e379 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.util
import org.apache.paimon.spark.SparkTable
import
org.apache.paimon.spark.schema.PaimonMetadataColumn.{PATH_AND_INDEX_META_COLUMNS,
ROW_TRACKING_META_COLUMNS}
import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
-import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.source.Split
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -35,7 +35,7 @@ trait ScanPlanHelper extends SQLConfHelper {
/** Create a new scan plan from a relation with the given data splits,
condition(optional). */
def createNewScanPlan(
- dataSplits: Seq[DataSplit],
+ dataSplits: Seq[Split],
relation: DataSourceV2Relation,
condition: Option[Expression]): LogicalPlan = {
val newRelation = createNewScanPlan(dataSplits, relation)
@@ -46,7 +46,7 @@ trait ScanPlanHelper extends SQLConfHelper {
}
def createNewScanPlan(
- dataSplits: Seq[DataSplit],
+ dataSplits: Seq[Split],
relation: DataSourceV2Relation): DataSourceV2Relation = {
relation.table match {
case sparkTable @ SparkTable(table: InnerTable) =>
@@ -73,9 +73,7 @@ trait ScanPlanHelper extends SQLConfHelper {
/** This wrapper is only used in java code, e.g. Procedure. */
object ScanPlanHelper extends ScanPlanHelper {
- def createNewScanPlan(
- dataSplits: Array[DataSplit],
- relation: DataSourceV2Relation): LogicalPlan = {
+ def createNewScanPlan(dataSplits: Array[Split], relation:
DataSourceV2Relation): LogicalPlan = {
ScanPlanHelper.createNewScanPlan(dataSplits.toSeq, relation)
}
}