This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 58188b7728 [core] Remove stale dv files in ClusteringCompactManager
(#7538)
58188b7728 is described below
commit 58188b772880e4545c6f4e85bc79fbd7a3da2f3b
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 26 19:49:07 2026 +0800
[core] Remove stale dv files in ClusteringCompactManager (#7538)
---
.../clustering/ClusteringCompactManager.java | 4 ++
.../paimon/separated/ClusteringTableTest.java | 56 ++++++++++++++++++++++
2 files changed, 60 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 12acea3be6..3e288b22ca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -228,6 +228,10 @@ public class ClusteringCompactManager extends
CompactFutureManager {
for (DataFileMeta newFile : mergedFiles) {
keyIndex.rebuildIndex(newFile);
}
+ // Remove stale deletion vectors for merged-away files
+ for (DataFileMeta file : mergeGroup) {
+ dvMaintainer.removeDeletionVectorOf(file.fileName());
+ }
result.before().addAll(mergeGroup);
result.after().addAll(mergedFiles);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index 353dd6d704..e33afb7927 100644
---
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.separated;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -26,6 +27,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
@@ -34,7 +36,9 @@ import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.CloseableIterator;
@@ -44,7 +48,10 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
@@ -386,6 +393,55 @@ class ClusteringTableTest {
GenericRow.of(6, 600));
}
+ /**
+ * Test that deletion vectors for merged-away files are cleaned up during
Phase 2 compaction.
+ */
+ @Test
+ public void testDeletionVectorCleanupAfterMerge() throws Exception {
+ // Write many rounds of overlapping keys to trigger both Phase 1 (DV
creation)
+ // and Phase 2 (merge of sorted files, DV cleanup)
+ for (int round = 0; round < 10; round++) {
+ writeRows(
+ Arrays.asList(
+ GenericRow.of(1, round * 10),
+ GenericRow.of(2, round * 10 + 1),
+ GenericRow.of(3, round * 10 + 2)));
+ }
+
+ // After many rounds of compaction with merges, only the latest values
should remain
+ assertThat(readRows())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 90), GenericRow.of(2, 91),
GenericRow.of(3, 92));
+
+ // Assert DV files are aligned with data files:
+ // every file referenced in deletion vectors must exist as an active
data file
+ Snapshot snapshot = table.latestSnapshot().orElse(null);
+
+ // Collect all active data file names from manifests
+ List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+ Set<String> activeDataFiles = new HashSet<>();
+ for (Split split : splits) {
+ Optional<List<RawFile>> rawFiles = split.convertToRawFiles();
+ assertThat(rawFiles.isPresent()).isTrue();
+ rawFiles.get().stream()
+ .map(RawFile::path)
+ .map(path -> new Path(path).getName())
+ .forEach(activeDataFiles::add);
+ }
+
+ // Collect all file names referenced in DV index entries
+ Set<String> dvReferencedFiles = new HashSet<>();
+ for (IndexManifestEntry indexEntry :
+
table.indexManifestFileReader().read(snapshot.indexManifest())) {
+
dvReferencedFiles.addAll(indexEntry.indexFile().dvRanges().keySet());
+ }
+
+ // Every DV-referenced file must be an active data file
+ assertThat(activeDataFiles)
+ .as("DV references should be a subset of active data files")
+ .containsAll(dvReferencedFiles);
+ }
+
// ==================== Clustering Column Filter Tests ====================
/** Test that equality filter on clustering column skips irrelevant files
in the scan plan. */