This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 6cf1b374a9fd9086465b240c0ad03cf4392f2c54 Author: Jingsong Lee <[email protected]> AuthorDate: Thu Mar 26 19:49:07 2026 +0800 [core] Remove stale dv files in ClusteringCompactManager (#7538) --- .../clustering/ClusteringCompactManager.java | 4 ++ .../paimon/separated/ClusteringTableTest.java | 56 ++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java index 12acea3be6..3e288b22ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java @@ -228,6 +228,10 @@ public class ClusteringCompactManager extends CompactFutureManager { for (DataFileMeta newFile : mergedFiles) { keyIndex.rebuildIndex(newFile); } + // Remove stale deletion vectors for merged-away files + for (DataFileMeta file : mergeGroup) { + dvMaintainer.removeDeletionVectorOf(file.fileName()); + } result.before().addAll(mergeGroup); result.after().addAll(mergedFiles); } diff --git a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java index 353dd6d704..e33afb7927 100644 --- a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.separated; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -26,6 +27,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; @@ -34,7 +36,9 @@ import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.CloseableIterator; @@ -44,7 +48,10 @@ import org.junit.jupiter.api.io.TempDir; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Optional; +import java.util.Set; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS; @@ -386,6 +393,55 @@ class ClusteringTableTest { GenericRow.of(6, 600)); } + /** + * Test that deletion vectors for merged-away files are cleaned up during Phase 2 compaction. + */ + @Test + public void testDeletionVectorCleanupAfterMerge() throws Exception { + // Write many rounds of overlapping keys to trigger both Phase 1 (DV creation) + // and Phase 2 (merge of sorted files, DV cleanup) + for (int round = 0; round < 10; round++) { + writeRows( + Arrays.asList( + GenericRow.of(1, round * 10), + GenericRow.of(2, round * 10 + 1), + GenericRow.of(3, round * 10 + 2))); + } + + // After many rounds of compaction with merges, only the latest values should remain + assertThat(readRows()) + .containsExactlyInAnyOrder( + GenericRow.of(1, 90), GenericRow.of(2, 91), GenericRow.of(3, 92)); + + // Assert DV files are aligned with data files: + // every file referenced in deletion vectors must exist as an active data file + Snapshot snapshot = table.latestSnapshot().orElse(null); + + // Collect all active data file names from manifests + List<Split> splits = table.newReadBuilder().newScan().plan().splits(); + Set<String> activeDataFiles = new HashSet<>(); + for (Split split : splits) { + Optional<List<RawFile>> rawFiles = split.convertToRawFiles(); + assertThat(rawFiles.isPresent()).isTrue(); + rawFiles.get().stream() + .map(RawFile::path) + .map(path -> new Path(path).getName()) + .forEach(activeDataFiles::add); + } + + // Collect all file names referenced in DV index entries + Set<String> dvReferencedFiles = new HashSet<>(); + for (IndexManifestEntry indexEntry : + table.indexManifestFileReader().read(snapshot.indexManifest())) { + dvReferencedFiles.addAll(indexEntry.indexFile().dvRanges().keySet()); + } + + // Every DV-referenced file must be an active data file + assertThat(activeDataFiles) + .as("DV references should be a subset of active data files") + .containsAll(dvReferencedFiles); + } + // ==================== Clustering Column Filter Tests ==================== /** Test that equality filter on clustering column skips irrelevant files in the scan plan. */
