This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 52a6269831 [index] lumina index build support append model (#7631)
52a6269831 is described below
commit 52a626983147ab11a3500bd2566a11dd560a1ec2
Author: jerry <[email protected]>
AuthorDate: Mon Apr 13 13:56:36 2026 +0800
[index] lumina index build support append model (#7631)
---
.../flink/globalindex/GenericIndexTopoBuilder.java | 60 +++++++++++++++++++++-
.../globalindex/GenericIndexTopoBuilderTest.java | 59 +++++++++++++++++++++
2 files changed, 118 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index afa68624ee..e37970723c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -38,6 +38,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -64,6 +65,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -219,6 +221,10 @@ public class GenericIndexTopoBuilder {
indexColumn,
maxIndexedRowId);
+ long minNonIndexableRowId =
+ findMinNonIndexableRowId(table.schemaManager(), entries,
indexColumn);
+ entries = filterEntriesBefore(entries, minNonIndexableRowId);
+
RowType rowType = table.rowType();
DataField indexField = rowType.getField(indexColumn);
// Project indexColumn + _ROW_ID so we can read the actual row ID from
data
@@ -292,6 +298,49 @@ public class GenericIndexTopoBuilder {
return true;
}
+ /**
+ * Find the minimum firstRowId among files whose schema does not contain
the index column. Files
+ * at or beyond this rowId cannot be indexed because the column was added
later via ALTER TABLE.
+ *
+ * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files
contain the column
+ */
+ static long findMinNonIndexableRowId(
+ SchemaManager schemaManager, List<ManifestEntry> entries, String
indexColumn) {
+ Map<Long, Boolean> schemaContainsColumn = new HashMap<>();
+ long minRowId = Long.MAX_VALUE;
+ for (ManifestEntry entry : entries) {
+ long sid = entry.file().schemaId();
+ boolean contains =
+ schemaContainsColumn.computeIfAbsent(
+ sid, id ->
schemaManager.schema(id).fieldNames().contains(indexColumn));
+ if (!contains && entry.file().firstRowId() != null) {
+ minRowId = Math.min(minRowId,
entry.file().nonNullFirstRowId());
+ }
+ }
+ return minRowId;
+ }
+
+ /** Keep only entries whose firstRowId is strictly less than the given
boundary. */
+ static List<ManifestEntry> filterEntriesBefore(
+ List<ManifestEntry> entries, long boundaryRowId) {
+ if (boundaryRowId == Long.MAX_VALUE) {
+ return entries;
+ }
+ List<ManifestEntry> result = new ArrayList<>();
+ for (ManifestEntry entry : entries) {
+ if (entry.file().firstRowId() != null
+ && entry.file().nonNullFirstRowId() < boundaryRowId) {
+ result.add(entry);
+ }
+ }
+ LOG.info(
+ "Filtered {} files at or beyond rowId {}, {} files remain.",
+ entries.size() - result.size(),
+ boundaryRowId,
+ result.size());
+ return result;
+ }
+
/**
* Compute shard tasks for a full build (no rows to skip).
*
@@ -577,7 +626,16 @@ public class GenericIndexTopoBuilder {
}
// Only write rows within this shard's range
if (currentRowId >= task.shardRange.from) {
-
indexWriter.write(indexFieldGetter.getFieldOrNull(row));
+ Object fieldData =
indexFieldGetter.getFieldOrNull(row);
+ if (fieldData == null) {
+ LOG.info(
+ "Null vector at rowId={}, stopping
shard [{}, {}].",
+ currentRowId,
+ task.shardRange.from,
+ task.shardRange.to);
+ break;
+ }
+ indexWriter.write(fieldData);
rowsWritten++;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
index d8ea36e9d5..0de57077b2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.PojoDataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -36,6 +38,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -450,8 +453,64 @@ class GenericIndexTopoBuilderTest {
assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 499));
}
+ @Test
+ void testAppendFilterOldFilesBeforeNewFiles() {
+ // Typical append: write file0[0,99](schema1), file1[100,199](schema1),
+ // then file2[200,299](schema0) arrives (old schema).
+ // Boundary = 200, keep files with firstRowId < 200.
+ SchemaManager schemaManager = mock(SchemaManager.class);
+ TableSchema oldSchema = mock(TableSchema.class);
+ TableSchema newSchema = mock(TableSchema.class);
+ when(schemaManager.schema(0L)).thenReturn(oldSchema);
+ when(schemaManager.schema(1L)).thenReturn(newSchema);
+ when(oldSchema.fieldNames()).thenReturn(Arrays.asList("id", "name"));
+ when(newSchema.fieldNames()).thenReturn(Arrays.asList("id", "name",
"vec"));
+
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 0L, 100, 1L));
+ entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 100L, 100,
1L));
+ entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 200L, 100,
0L));
+
+ List<ManifestEntry> result =
+ GenericIndexTopoBuilder.filterEntriesBefore(
+ entries,
+ GenericIndexTopoBuilder.findMinNonIndexableRowId(
+ schemaManager, entries, "vec"));
+
+ assertThat(result).hasSize(2);
+ assertThat(result.get(0).file().nonNullFirstRowId()).isEqualTo(0L);
+ assertThat(result.get(1).file().nonNullFirstRowId()).isEqualTo(100L);
+ }
+
// -- Helpers --
+ private static ManifestEntry createEntryWithSchemaId(
+ BinaryRow partition, Long firstRowId, long rowCount, long
schemaId) {
+ PojoDataFileMeta file =
+ new PojoDataFileMeta(
+ "test-file-" + UUID.randomUUID(),
+ 1024L,
+ rowCount,
+ BinaryRow.EMPTY_ROW,
+ BinaryRow.EMPTY_ROW,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ schemaId,
+ 0,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ firstRowId,
+ null);
+ return ManifestEntry.create(FileKind.ADD, partition, 0, 1, file);
+ }
+
private static ManifestEntry createEntry(BinaryRow partition, Long
firstRowId, long rowCount) {
PojoDataFileMeta file =
new PojoDataFileMeta(