This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 6cf1b374a9fd9086465b240c0ad03cf4392f2c54
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 26 19:49:07 2026 +0800

    [core] Remove stale dv files in ClusteringCompactManager (#7538)
---
 .../clustering/ClusteringCompactManager.java       |  4 ++
 .../paimon/separated/ClusteringTableTest.java      | 56 ++++++++++++++++++++++
 2 files changed, 60 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 12acea3be6..3e288b22ca 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -228,6 +228,10 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
                 for (DataFileMeta newFile : mergedFiles) {
                     keyIndex.rebuildIndex(newFile);
                 }
+                // Remove stale deletion vectors for merged-away files
+                for (DataFileMeta file : mergeGroup) {
+                    dvMaintainer.removeDeletionVectorOf(file.fileName());
+                }
                 result.before().addAll(mergeGroup);
                 result.after().addAll(mergedFiles);
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index 353dd6d704..e33afb7927 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.separated;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -26,6 +27,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
@@ -34,7 +36,9 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.CloseableIterator;
 
@@ -44,7 +48,10 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
@@ -386,6 +393,55 @@ class ClusteringTableTest {
                         GenericRow.of(6, 600));
     }
 
+    /**
+     * Test that deletion vectors for merged-away files are cleaned up during 
Phase 2 compaction.
+     */
+    @Test
+    public void testDeletionVectorCleanupAfterMerge() throws Exception {
+        // Write many rounds of overlapping keys to trigger both Phase 1 (DV 
creation)
+        // and Phase 2 (merge of sorted files, DV cleanup)
+        for (int round = 0; round < 10; round++) {
+            writeRows(
+                    Arrays.asList(
+                            GenericRow.of(1, round * 10),
+                            GenericRow.of(2, round * 10 + 1),
+                            GenericRow.of(3, round * 10 + 2)));
+        }
+
+        // After many rounds of compaction with merges, only the latest values 
should remain
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 90), GenericRow.of(2, 91), 
GenericRow.of(3, 92));
+
+        // Assert DV files are aligned with data files:
+        // every file referenced in deletion vectors must exist as an active 
data file
+        Snapshot snapshot = table.latestSnapshot().orElse(null);
+
+        // Collect all active data file names from manifests
+        List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+        Set<String> activeDataFiles = new HashSet<>();
+        for (Split split : splits) {
+            Optional<List<RawFile>> rawFiles = split.convertToRawFiles();
+            assertThat(rawFiles.isPresent()).isTrue();
+            rawFiles.get().stream()
+                    .map(RawFile::path)
+                    .map(path -> new Path(path).getName())
+                    .forEach(activeDataFiles::add);
+        }
+
+        // Collect all file names referenced in DV index entries
+        Set<String> dvReferencedFiles = new HashSet<>();
+        for (IndexManifestEntry indexEntry :
+                
table.indexManifestFileReader().read(snapshot.indexManifest())) {
+            
dvReferencedFiles.addAll(indexEntry.indexFile().dvRanges().keySet());
+        }
+
+        // Every DV-referenced file must be an active data file
+        assertThat(activeDataFiles)
+                .as("DV references should be a subset of active data files")
+                .containsAll(dvReferencedFiles);
+    }
+
     // ==================== Clustering Column Filter Tests ====================
 
     /** Test that equality filter on clustering column skips irrelevant files 
in the scan plan. */

Reply via email to