This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a1a8dcb165 [core] Allow pk-clustering-override without explicit DV for 
first-row merge engine (#7622)
a1a8dcb165 is described below

commit a1a8dcb1652222ee87a7555e0044e1d10f3942a4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 13 14:48:03 2026 +0800

    [core] Allow pk-clustering-override without explicit DV for first-row merge 
engine (#7622)
    
    First-row merge engine already has built-in deletion vector semantics,
    so requiring users to explicitly enable deletion-vectors is unnecessary.
---
 .../primary-key-table/pk-clustering-override.md    | 19 ++++-
 .../clustering/ClusteringCompactManager.java       | 28 ++++---
 .../compact/clustering/ClusteringFileRewriter.java | 43 ++++++++++-
 .../compact/clustering/ClusteringKeyIndex.java     | 84 +++++++++------------
 .../org/apache/paimon/schema/SchemaValidation.java |  5 +-
 .../paimon/separated/ClusteringTableTest.java      | 86 ++++++++++++++++++++++
 6 files changed, 199 insertions(+), 66 deletions(-)

diff --git a/docs/content/primary-key-table/pk-clustering-override.md 
b/docs/content/primary-key-table/pk-clustering-override.md
index 3348bc0482..91b3fa14e8 100644
--- a/docs/content/primary-key-table/pk-clustering-override.md
+++ b/docs/content/primary-key-table/pk-clustering-override.md
@@ -50,6 +50,23 @@ CREATE TABLE my_table (
 );
 ```
 
+For `first-row` merge engine, deletion vectors are already built-in, so you 
don't need to enable them explicitly:
+
+```sql
+CREATE TABLE my_table (
+    id BIGINT,
+    dt STRING,
+    city STRING,
+    amount DOUBLE,
+    PRIMARY KEY (id) NOT ENFORCED
+) WITH (
+    'pk-clustering-override' = 'true',
+    'clustering.columns' = 'city',
+    'merge-engine' = 'first-row',
+    'bucket' = '4'
+);
+```
+
 After this, data files within each bucket will be physically sorted by `city` 
instead of `id`. Queries like
 `SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files 
by checking their min/max statistics
 on the clustering column.
@@ -60,7 +77,7 @@ on the clustering column.
 |--------|-------------|
 | `pk-clustering-override` | `true` |
 | `clustering.columns` | Must be set (one or more non-primary-key columns) |
-| `deletion-vectors.enabled` | Must be `true` |
+| `deletion-vectors.enabled` | Must be `true` (not required for `first-row` 
merge engine) |
 | `merge-engine` | `deduplicate` (default) or `first-row` only |
 
 ## When to Use
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 1e9b02b32b..9aac4ba08e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -43,8 +43,10 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.IntStream;
@@ -204,10 +206,14 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
         // Snapshot sorted files before Phase 1 to avoid including newly 
created files in Phase 2
         List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
         for (DataFileMeta file : unsortedFiles) {
+            Set<String> originalFileNames = 
Collections.singleton(file.fileName());
             List<DataFileMeta> sortedFiles =
                     fileRewriter.sortAndRewriteFiles(
-                            singletonList(file), kvSerializer, kvSchemaType);
-            keyIndex.updateIndex(file, sortedFiles);
+                            singletonList(file),
+                            kvSerializer,
+                            kvSchemaType,
+                            keyIndex,
+                            originalFileNames);
             result.before().add(file);
             result.after().addAll(sortedFiles);
         }
@@ -232,19 +238,23 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
                     keyIndex.rebuildIndex(newFile);
                 }
                 // Remove stale deletion vectors for merged-away files
-                for (DataFileMeta file : mergeGroup) {
-                    dvMaintainer.removeDeletionVectorOf(file.fileName());
+                if (dvMaintainer != null) {
+                    for (DataFileMeta file : mergeGroup) {
+                        dvMaintainer.removeDeletionVectorOf(file.fileName());
+                    }
                 }
                 result.before().addAll(mergeGroup);
                 result.after().addAll(mergedFiles);
             }
         }
 
-        CompactDeletionFile deletionFile =
-                lazyGenDeletionFile
-                        ? CompactDeletionFile.lazyGeneration(dvMaintainer)
-                        : CompactDeletionFile.generateFiles(dvMaintainer);
-        result.setDeletionFile(deletionFile);
+        if (dvMaintainer != null) {
+            CompactDeletionFile deletionFile =
+                    lazyGenDeletionFile
+                            ? CompactDeletionFile.lazyGeneration(dvMaintainer)
+                            : CompactDeletionFile.generateFiles(dvMaintainer);
+            result.setDeletionFile(deletionFile);
+        }
         return result;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index ece4813764..3689e70483 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.disk.ChannelReaderInputView;
 import org.apache.paimon.disk.ChannelReaderInputViewIterator;
 import org.apache.paimon.disk.ChannelWithMeta;
@@ -47,6 +48,8 @@ import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
 import org.apache.paimon.utils.MutableObjectIterator;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,6 +57,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.Set;
 
 /**
  * Handles file rewriting for clustering compaction, including sorting 
unsorted files (Phase 1) and
@@ -112,10 +116,19 @@ public class ClusteringFileRewriter {
 
     /**
      * Sort and rewrite unsorted files by clustering columns. Reads all 
KeyValue records, sorts them
-     * using an external sort buffer, and writes to new level-1 files.
+     * using an external sort buffer, and writes to new level-1 files. Checks 
the key index inline
+     * during writing to handle deduplication (FIRST_ROW skips duplicates, 
DEDUPLICATE marks old
+     * positions in DV) and updates the index without re-reading the output 
files.
+     *
+     * @param keyIndex the key index for inline checking and batch update, or 
null to skip
+     * @param originalFileNames file names of the original files being 
replaced (for index check)
      */
     public List<DataFileMeta> sortAndRewriteFiles(
-            List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer, 
RowType kvSchemaType)
+            List<DataFileMeta> inputFiles,
+            KeyValueSerializer kvSerializer,
+            RowType kvSchemaType,
+            @Nullable ClusteringKeyIndex keyIndex,
+            Set<String> originalFileNames)
             throws Exception {
         int[] sortFieldsInKeyValue =
                 Arrays.stream(clusteringColumns)
@@ -145,6 +158,10 @@ public class ClusteringFileRewriter {
             }
         }
 
+        RowCompactedSerializer keySerializer =
+                keyIndex != null ? new RowCompactedSerializer(keyType) : null;
+        List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() : 
null;
+
         RollingFileWriter<KeyValue, DataFileMeta> writer =
                 writerFactory.createRollingClusteringFileWriter();
         try {
@@ -152,10 +169,18 @@ public class ClusteringFileRewriter {
             BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount());
             while ((binaryRow = sortedIterator.next(binaryRow)) != null) {
                 KeyValue kv = kvSerializer.fromRow(binaryRow);
-                writer.write(
+                KeyValue copied =
                         kv.copy(
                                 new InternalRowSerializer(keyType),
-                                new InternalRowSerializer(valueType)));
+                                new InternalRowSerializer(valueType));
+                if (keyIndex != null) {
+                    byte[] keyBytes = 
keySerializer.serializeToBytes(copied.key());
+                    if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
+                        continue;
+                    }
+                    collectedKeys.add(keyBytes);
+                }
+                writer.write(copied);
             }
         } finally {
             sortBuffer.clear();
@@ -170,6 +195,16 @@ public class ClusteringFileRewriter {
             fileLevels.addNewFile(newFile);
         }
 
+        // Batch update index using collected keys, split by file rowCount
+        if (keyIndex != null) {
+            int offset = 0;
+            for (DataFileMeta newFile : newFiles) {
+                int count = (int) newFile.rowCount();
+                keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, 
offset + count));
+                offset += count;
+            }
+        }
+
         return newFiles;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
index d7234345f6..2b4e412727 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -45,8 +45,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -209,61 +207,47 @@ public class ClusteringKeyIndex implements Closeable {
     }
 
     /**
-     * Update the key index after a single original file is replaced by new 
sorted files.
+     * Check a key against the index during sort-and-rewrite writing.
      *
-     * <p>For DEDUPLICATE mode: mark the old position in deletion vectors, 
keep the new position.
+     * <p>For FIRST_ROW mode: if key exists pointing to a non-original file, 
return false (skip
+     * writing this record — it's a duplicate).
      *
-     * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion 
vectors (keep the
-     * first/old one); if key is new, store the new position.
+     * <p>For DEDUPLICATE mode: if key exists pointing to a non-original file, 
mark the old position
+     * in deletion vectors, return true (write the new record).
+     *
+     * @param keyBytes serialized key bytes
+     * @param originalFileNames file names of the original unsorted files 
being replaced
+     * @return true if the record should be written, false to skip (FIRST_ROW 
dedup)
      */
-    public void updateIndex(DataFileMeta originalFile, List<DataFileMeta> 
newSortedFiles)
-            throws Exception {
-        updateIndex(Collections.singletonList(originalFile), newSortedFiles);
+    public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames) 
throws Exception {
+        byte[] oldValue = kvDb.get(keyBytes);
+        if (oldValue != null) {
+            ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
+            int oldFileId = decodeInt(valueIn);
+            int oldPosition = decodeInt(valueIn);
+            DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
+            if (oldFile != null && 
!originalFileNames.contains(oldFile.fileName())) {
+                if (firstRow) {
+                    return false;
+                } else {
+                    dvMaintainer.notifyNewDeletion(oldFile.fileName(), 
oldPosition);
+                }
+            }
+        }
+        return true;
     }
 
     /**
-     * Update the key index after multiple original files are replaced by new 
sorted files.
-     *
-     * @see #updateIndex(DataFileMeta, List)
+     * Batch update the key index for a new sorted file using pre-collected 
key bytes. Avoids
+     * re-reading the file.
      */
-    public void updateIndex(List<DataFileMeta> originalFiles, 
List<DataFileMeta> newSortedFiles)
-            throws Exception {
-        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
-
-        Set<String> originalFileNames = new HashSet<>();
-        for (DataFileMeta file : originalFiles) {
-            originalFileNames.add(file.fileName());
-        }
-
-        for (DataFileMeta sortedFile : newSortedFiles) {
-            int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
-            int position = 0;
-            try (CloseableIterator<InternalRow> iterator = 
readKeyIterator(sortedFile)) {
-                while (iterator.hasNext()) {
-                    byte[] key = 
keySerializer.serializeToBytes(iterator.next());
-                    byte[] oldValue = kvDb.get(key);
-                    if (oldValue != null) {
-                        ByteArrayInputStream valueIn = new 
ByteArrayInputStream(oldValue);
-                        int oldFileId = decodeInt(valueIn);
-                        int oldPosition = decodeInt(valueIn);
-                        DataFileMeta oldFile = 
fileLevels.getFileById(oldFileId);
-                        if (oldFile != null && 
!originalFileNames.contains(oldFile.fileName())) {
-                            if (firstRow) {
-                                
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
-                                position++;
-                                continue;
-                            } else {
-                                
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
-                            }
-                        }
-                    }
-                    ByteArrayOutputStream value = new ByteArrayOutputStream(8);
-                    encodeInt(value, fileId);
-                    encodeInt(value, position);
-                    kvDb.put(key, value.toByteArray());
-                    position++;
-                }
-            }
+    public void batchPutIndex(DataFileMeta sortedFile, List<byte[]> 
keyBytesList) throws Exception {
+        int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
+        for (int position = 0; position < keyBytesList.size(); position++) {
+            ByteArrayOutputStream value = new ByteArrayOutputStream(8);
+            encodeInt(value, fileId);
+            encodeInt(value, position);
+            kvDb.put(keyBytesList.get(position), value.toByteArray());
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 2ff1080c4a..271709c47e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -522,7 +522,7 @@ public class SchemaValidation {
                         || options.changelogProducer() == 
ChangelogProducer.LOOKUP,
                 "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP 
changelog producer now.");
 
-        // pk-clustering-override mode requires deletion vectors even for 
first-row
+        // pk-clustering-override mode allows deletion vectors for first-row
         if (!options.pkClusteringOverride()) {
             checkArgument(
                     !options.mergeEngine().equals(MergeEngine.FIRST_ROW),
@@ -847,7 +847,8 @@ public class SchemaValidation {
                 throw new IllegalArgumentException(
                         "Cannot support 'pk-clustering-override' mode without 
'clustering.columns'.");
             }
-            if (!options.deletionVectorsEnabled()) {
+            if (!options.deletionVectorsEnabled()
+                    && options.mergeEngine() != 
CoreOptions.MergeEngine.FIRST_ROW) {
                 throw new UnsupportedOperationException(
                         "Cannot support deletion-vectors disabled in 
'pk-clustering-override' mode.");
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index e9ddc9e000..dc1aa5febe 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -519,6 +520,22 @@ class ClusteringTableTest {
                 .containsExactlyInAnyOrder(GenericRow.of(1, 100), 
GenericRow.of(2, 200));
     }
 
+    /** Test first-row mode without explicit deletion-vectors enabled. */
+    @Test
+    public void testFirstRowWithoutDeletionVectors() throws Exception {
+        Table firstRowTable = createFirstRowTableWithoutDv();
+
+        // Write initial data
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 100), 
GenericRow.of(2, 200)));
+
+        // Write same keys with different values - should be ignored 
(first-row keeps first)
+        writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 999), 
GenericRow.of(2, 888)));
+
+        // Should still see the first values
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 100), 
GenericRow.of(2, 200));
+    }
+
     /** Test first-row mode with multiple commits. */
     @Test
     public void testFirstRowMultipleCommits() throws Exception {
@@ -617,6 +634,59 @@ class ClusteringTableTest {
                 .containsExactlyInAnyOrder(GenericRow.of(1, 10), 
GenericRow.of(2, 20));
     }
 
+    /**
+     * Test that FIRST_ROW inline dedup actually reduces the number of records 
written. Duplicate
+     * keys should be dropped during sort-and-rewrite, resulting in fewer 
total rows across data
+     * files compared to the number of rows written.
+     */
+    @Test
+    public void testFirstRowInlineDedupReducesFileRows() throws Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        // Commit 1: write 5 unique keys
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 30),
+                        GenericRow.of(4, 40),
+                        GenericRow.of(5, 50)));
+
+        // Commit 2: write 5 duplicate keys (all should be dropped inline)
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(1, 99),
+                        GenericRow.of(2, 99),
+                        GenericRow.of(3, 99),
+                        GenericRow.of(4, 99),
+                        GenericRow.of(5, 99)));
+
+        // Verify correctness: still see first values
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 30),
+                        GenericRow.of(4, 40),
+                        GenericRow.of(5, 50));
+
+        // Verify optimization: total row count across all data files should 
be exactly 5
+        // (duplicates dropped during writing, not just DV-marked)
+        List<Split> splits = 
firstRowTable.newReadBuilder().newScan().plan().splits();
+        long totalRows =
+                splits.stream()
+                        .mapToLong(
+                                split ->
+                                        ((DataSplit) split)
+                                                .dataFiles().stream()
+                                                        
.mapToLong(DataFileMeta::rowCount)
+                                                        .sum())
+                        .sum();
+        assertThat(totalRows).isEqualTo(5);
+    }
+
     /** Test first-row mode with many writes to trigger compaction. */
     @Test
     public void testFirstRowManyWrites() throws Exception {
@@ -915,6 +985,22 @@ class ClusteringTableTest {
         return catalog.getTable(identifier);
     }
 
+    private Table createFirstRowTableWithoutDv() throws Exception {
+        Identifier identifier = Identifier.create("default", 
"first_row_no_dv_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "b")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .option(MERGE_ENGINE.key(), "first-row")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
     private void writeRows(List<GenericRow> rows) throws Exception {
         writeRows(table, rows);
     }

Reply via email to