This is an automated email from the ASF dual-hosted git repository.
jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1e1c9f4ed8 [index] lumina index build in flink support increment
(#7611)
1e1c9f4ed8 is described below
commit 1e1c9f4ed8c28420e8248862b15b03fce9b7edca
Author: jerry <[email protected]>
AuthorDate: Fri Apr 10 09:32:14 2026 +0800
[index] lumina index build in flink support increment (#7611)
---
.../flink/globalindex/GenericIndexTopoBuilder.java | 131 +++++++++++-
.../globalindex/GenericIndexTopoBuilderTest.java | 232 +++++++++++++++++++++
2 files changed, 352 insertions(+), 11 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index bcbabac793..afa68624ee 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -88,6 +88,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class GenericIndexTopoBuilder {
private static final Logger LOG =
LoggerFactory.getLogger(GenericIndexTopoBuilder.class);
+ public static final long NO_MAX_INDEXED_ROW_ID = -1L;
public static void buildIndexAndExecute(
StreamExecutionEnvironment env,
@@ -97,6 +98,25 @@ public class GenericIndexTopoBuilder {
PartitionPredicate partitionPredicate,
Options userOptions)
throws Exception {
+ buildIndexAndExecute(
+ env,
+ table,
+ indexColumn,
+ indexType,
+ partitionPredicate,
+ userOptions,
+ NO_MAX_INDEXED_ROW_ID);
+ }
+
+ public static void buildIndexAndExecute(
+ StreamExecutionEnvironment env,
+ FileStoreTable table,
+ String indexColumn,
+ String indexType,
+ PartitionPredicate partitionPredicate,
+ Options userOptions,
+ long maxIndexedRowId)
+ throws Exception {
boolean hasIndexToBuild =
buildIndex(
env,
@@ -105,7 +125,8 @@ public class GenericIndexTopoBuilder {
indexColumn,
indexType,
partitionPredicate,
- userOptions);
+ userOptions,
+ maxIndexedRowId);
if (hasIndexToBuild) {
env.execute("Create " + indexType + " global index for table: " +
table.name());
} else {
@@ -113,6 +134,26 @@ public class GenericIndexTopoBuilder {
}
}
+ public static boolean buildIndex(
+ StreamExecutionEnvironment env,
+ Supplier<GenericGlobalIndexBuilder> indexBuilderSupplier,
+ FileStoreTable table,
+ String indexColumn,
+ String indexType,
+ PartitionPredicate partitionPredicate,
+ Options userOptions)
+ throws Exception {
+ return buildIndex(
+ env,
+ indexBuilderSupplier,
+ table,
+ indexColumn,
+ indexType,
+ partitionPredicate,
+ userOptions,
+ NO_MAX_INDEXED_ROW_ID);
+ }
+
/**
* Builds a generic global index topology using a {@link
GenericGlobalIndexBuilder} supplier.
*
@@ -126,7 +167,8 @@ public class GenericIndexTopoBuilder {
String indexColumn,
String indexType,
PartitionPredicate partitionPredicate,
- Options userOptions)
+ Options userOptions,
+ long maxIndexedRowId)
throws Exception {
GenericGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get();
if (partitionPredicate != null) {
@@ -135,14 +177,47 @@ public class GenericIndexTopoBuilder {
List<ManifestEntry> entries = indexBuilder.scan();
List<IndexManifestEntry> deletedIndexEntries =
indexBuilder.deletedIndexEntries();
+
+ return buildTopology(
+ env,
+ table,
+ indexColumn,
+ indexType,
+ userOptions,
+ entries,
+ deletedIndexEntries,
+ maxIndexedRowId);
+ }
+
+ /**
+ * Builds the Flink topology for global index creation from pre-scanned
entries. Supports both
+ * full builds ({@code maxIndexedRowId = NO_MAX_INDEXED_ROW_ID}) and
incremental builds where
+ * rows up to {@code maxIndexedRowId} are skipped.
+ *
+ * @param maxIndexedRowId the maximum row ID already indexed; use {@link
#NO_MAX_INDEXED_ROW_ID}
+ * for a full build
+ * @return {@code true} if a Flink topology was built, {@code false} if
nothing to index
+ */
+ private static boolean buildTopology(
+ StreamExecutionEnvironment env,
+ FileStoreTable table,
+ String indexColumn,
+ String indexType,
+ Options userOptions,
+ List<ManifestEntry> entries,
+ List<IndexManifestEntry> deletedIndexEntries,
+ long maxIndexedRowId)
+ throws Exception {
long totalRowCount = entries.stream().mapToLong(e ->
e.file().rowCount()).sum();
LOG.info(
- "Scanned {} files ({} rows) across {} partitions for {} index
on column '{}'.",
+ "Scanned {} files ({} rows) across {} partitions for {} index
on column '{}'"
+ + (maxIndexedRowId >= 0 ? ", maxIndexedRowId={}." :
"."),
entries.size(),
totalRowCount,
entries.stream().map(ManifestEntry::partition).distinct().count(),
indexType,
- indexColumn);
+ indexColumn,
+ maxIndexedRowId);
RowType rowType = table.rowType();
DataField indexField = rowType.getField(indexColumn);
@@ -160,7 +235,8 @@ public class GenericIndexTopoBuilder {
"Option 'global-index.row-count-per-shard' must be greater
than 0.");
// Compute shard tasks at file level from the provided entries
- List<ShardTask> shardTasks = computeShardTasks(table, entries,
rowsPerShard);
+ List<ShardTask> shardTasks =
+ computeShardTasks(table, entries, rowsPerShard,
maxIndexedRowId);
if (shardTasks.isEmpty()) {
LOG.info("No shard tasks generated, nothing to index.");
return false;
@@ -216,13 +292,33 @@ public class GenericIndexTopoBuilder {
return true;
}
+ /**
+ * Compute shard tasks for a full build (no rows to skip).
+ *
+ * @see #computeShardTasks(FileStoreTable, List, long, long)
+ */
+ static List<ShardTask> computeShardTasks(
+ FileStoreTable table, List<ManifestEntry> entries, long
rowsPerShard) {
+ return computeShardTasks(table, entries, rowsPerShard,
NO_MAX_INDEXED_ROW_ID);
+ }
+
/**
* Compute shard tasks at file level from the given manifest entries. Each
shard only contains
* the files whose row ID ranges overlap with its shard range. A file
spanning multiple shard
* boundaries is included in each overlapping shard.
+ *
+ * <p>When {@code maxIndexedRowId >= 0}, each shard's effective start is
advanced past {@code
+ * maxIndexedRowId}, skipping fully-indexed shards entirely. This enables
incremental index
+ * building where only new (un-indexed) rows are processed.
+ *
+ * @param maxIndexedRowId the maximum row ID already indexed; use {@link
#NO_MAX_INDEXED_ROW_ID}
+ * for a full build
*/
static List<ShardTask> computeShardTasks(
- FileStoreTable table, List<ManifestEntry> entries, long
rowsPerShard) {
+ FileStoreTable table,
+ List<ManifestEntry> entries,
+ long rowsPerShard,
+ long maxIndexedRowId) {
// Group by partition (bucket is always 0 for unaware-bucket tables)
Map<BinaryRow, List<ManifestEntry>> entriesByPartition =
entries.stream().collect(Collectors.groupingBy(ManifestEntry::partition));
@@ -266,6 +362,15 @@ public class GenericIndexTopoBuilder {
continue;
}
+ // For incremental builds, advance past already-indexed rows
+ long effectiveStart =
+ maxIndexedRowId >= 0
+ ? Math.max(shardStart, maxIndexedRowId + 1)
+ : shardStart;
+ if (effectiveStart > shardEnd) {
+ continue; // entire shard already indexed
+ }
+
shardFiles.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
// Group contiguous files; gaps produce separate tasks
@@ -286,7 +391,7 @@ public class GenericIndexTopoBuilder {
tasks.add(
createShardTask(
currentGroup,
- shardStart,
+ effectiveStart,
shardEnd,
partition,
partBucketPath));
@@ -298,7 +403,11 @@ public class GenericIndexTopoBuilder {
if (!currentGroup.isEmpty()) {
tasks.add(
createShardTask(
- currentGroup, shardStart, shardEnd,
partition, partBucketPath));
+ currentGroup,
+ effectiveStart,
+ shardEnd,
+ partition,
+ partBucketPath));
}
}
}
@@ -307,7 +416,7 @@ public class GenericIndexTopoBuilder {
private static ShardTask createShardTask(
List<DataFileMeta> files,
- long shardStart,
+ long effectiveStart,
long shardEnd,
BinaryRow partition,
String bucketPath) {
@@ -315,8 +424,8 @@ public class GenericIndexTopoBuilder {
long groupMaxRowId =
files.stream().mapToLong(f ->
f.nonNullRowIdRange().to).max().getAsLong();
- // Clamp to shard boundaries
- long rangeFrom = Math.max(groupMinRowId, shardStart);
+ // Clamp to effective boundaries
+ long rangeFrom = Math.max(groupMinRowId, effectiveStart);
long rangeTo = Math.min(groupMaxRowId, shardEnd);
DataSplit dataSplit =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
index 8a01a8f7e5..d8ea36e9d5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
@@ -218,6 +218,238 @@ class GenericIndexTopoBuilderTest {
assertThat(tasks.get(1).shardRange).isEqualTo(new Range(100, 149));
}
+ // ========== Incremental build scenarios (maxIndexedRowId) ==========
+
+ @Test
+ void testIncrementalFirstBuildNoIndex() {
+ // First build: no existing index, two files.
+ // maxIndexedRowId=-1 → all shards created normally.
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 100));
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 100L, 100));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
-1);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(0, 199));
+ assertThat(tasks.get(0).split.dataFiles()).hasSize(2);
+ }
+
+ @Test
+ void testIncrementalNormalNoCompaction() {
+ // Indexed [0,199], new file [200,399]. No compaction.
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 200));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
199);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+ }
+
+ @Test
+ void testIncrementalNoNewDataAllIndexed() {
+ // All data [0,399] already indexed. All shards should be skipped.
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 400));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
399);
+
+ assertThat(tasks).isEmpty();
+ }
+
+ @Test
+ void testIncrementalCompactMergesIndexedAndUnindexed() {
+ // Files A[0,99], B[100,199] indexed, new C[200,299], compact B+C →
D[100,299]
+ // Shard 0 [0,199]: effectiveStart=200 > 199 → skip
+ // Shard 1 [200,299]: effectiveStart=200 → [200,299]
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 100L, 200)); // D[100,299]
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
199);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 299));
+ }
+
+ @Test
+ void testIncrementalCompactOnlyIndexedFiles() {
+ // Compact two indexed files → empty entries → no tasks.
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table,
Collections.emptyList(), 200, 199);
+
+ assertThat(tasks).isEmpty();
+ }
+
+ @Test
+ void testIncrementalCompactPartialWithUntouchedFiles() {
+ // Indexed [0,399]. Compact [200,399]+[400,599] → D[200,599].
+ // Shard 1 [200,399]: effectiveStart=400 > 399 → skip
+ // Shard 2 [400,599]: effectiveStart=400 → [400,599]
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 400)); // D[200,599]
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
399);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(400, 599));
+ }
+
+ @Test
+ void testIncrementalMultipleWritesThenCompact() {
+ // Write 200 (indexed), write 200 more, compact → big file [0,399].
+ // Shard 0 [0,199]: effectiveStart=200 > 199 → skip
+ // Shard 1 [200,399]: effectiveStart=200 → [200,399]
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 400));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
199);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+ }
+
+ @Test
+ void testIncrementalMergeAllSmallShards() {
+ // All entries small and deleted by merge. maxIndexedRowId=-1 → full
rebuild.
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 250));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
-1);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(0, 199));
+ assertThat(tasks.get(1).shardRange).isEqualTo(new Range(200, 249));
+ }
+
+ @Test
+ void testIncrementalMergePartialKeepLargeShard() {
+ // Entry [0,199] kept. Small shards [200-399] merged.
maxIndexedRowId=199.
+ // Shard 0 [0,199]: effectiveStart=200 > 199 → skip
+ // Shard 1 [200,399]: effectiveStart=200 → [200,399]
+ // Shard 2 [400,599]: effectiveStart=200 → [400,599]
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 600));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
199);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+ assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 599));
+ }
+
+ @Test
+ void testIncrementalShardBoundaryExactAlign() {
+ // maxIndexedRowId=199, new file starts exactly at shard boundary.
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 200));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
199);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+ }
+
+ @Test
+ void testIncrementalShardBoundaryNotAligned() {
+ // maxIndexedRowId=149. Compacted file [0,349].
+ // Shard 0 [0,199]: effectiveStart=150 → [150,199]
+ // Shard 1 [200,349]: → [200,349]
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 350));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
149);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(150, 199));
+ assertThat(tasks.get(1).shardRange).isEqualTo(new Range(200, 349));
+ }
+
+ @Test
+ void testIncrementalFileSpansMultipleShards() {
+ // One large file [0,599] spanning 3 shards, indexed [0,199].
+ // Shard 0: skip. Shard 1: [200,399]. Shard 2: [400,599].
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 600));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
199);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+ assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 599));
+ }
+
+ @Test
+ void testIncrementalNullFirstRowIdFileSkipped() {
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, null, 100));
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 100));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
199);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 299));
+ assertThat(tasks.get(0).split.dataFiles()).hasSize(1);
+ }
+
+ @Test
+ void testIncrementalMultipleFilesInOneShard() {
+ // Two contiguous new files in same shard.
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 100));
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 300L, 100));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 400,
199);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+ assertThat(tasks.get(0).split.dataFiles()).hasSize(2);
+ }
+
+ @Test
+ void testIncrementalGapBetweenFilesProducesSeparateTasks() {
+ // Two files with a gap, same shard. maxIndexedRowId=-1.
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 50));
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 150L, 50));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
-1);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(0, 49));
+ assertThat(tasks.get(1).shardRange).isEqualTo(new Range(150, 199));
+ }
+
+ @Test
+ void testIncrementalFileStartsAfterEffectiveStart() {
+ // maxIndexedRowId=250. New file [300,499].
+ // Shard 1 [200,399]: effectiveStart=251, file starts at 300 →
[300,399]
+ // Shard 2 [400,499]: effectiveStart=251 → [400,499]
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntry(BinaryRow.EMPTY_ROW, 300L, 200));
+
+ List<GenericIndexTopoBuilder.ShardTask> tasks =
+ GenericIndexTopoBuilder.computeShardTasks(table, entries, 200,
250);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).shardRange).isEqualTo(new Range(300, 399));
+ assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 499));
+ }
+
// -- Helpers --
private static ManifestEntry createEntry(BinaryRow partition, Long
firstRowId, long rowCount) {