This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a1a8dcb165 [core] Allow pk-clustering-override without explicit DV for
first-row merge engine (#7622)
a1a8dcb165 is described below
commit a1a8dcb1652222ee87a7555e0044e1d10f3942a4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 13 14:48:03 2026 +0800
[core] Allow pk-clustering-override without explicit DV for first-row merge
engine (#7622)
First-row merge engine already has built-in deletion vector semantics,
so requiring users to explicitly enable deletion-vectors is unnecessary.
---
.../primary-key-table/pk-clustering-override.md | 19 ++++-
.../clustering/ClusteringCompactManager.java | 28 ++++---
.../compact/clustering/ClusteringFileRewriter.java | 43 ++++++++++-
.../compact/clustering/ClusteringKeyIndex.java | 84 +++++++++------------
.../org/apache/paimon/schema/SchemaValidation.java | 5 +-
.../paimon/separated/ClusteringTableTest.java | 86 ++++++++++++++++++++++
6 files changed, 199 insertions(+), 66 deletions(-)
diff --git a/docs/content/primary-key-table/pk-clustering-override.md
b/docs/content/primary-key-table/pk-clustering-override.md
index 3348bc0482..91b3fa14e8 100644
--- a/docs/content/primary-key-table/pk-clustering-override.md
+++ b/docs/content/primary-key-table/pk-clustering-override.md
@@ -50,6 +50,23 @@ CREATE TABLE my_table (
);
```
+For `first-row` merge engine, deletion vectors are already built-in, so you
don't need to enable them explicitly:
+
+```sql
+CREATE TABLE my_table (
+ id BIGINT,
+ dt STRING,
+ city STRING,
+ amount DOUBLE,
+ PRIMARY KEY (id) NOT ENFORCED
+) WITH (
+ 'pk-clustering-override' = 'true',
+ 'clustering.columns' = 'city',
+ 'merge-engine' = 'first-row',
+ 'bucket' = '4'
+);
+```
+
After this, data files within each bucket will be physically sorted by `city`
instead of `id`. Queries like
`SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files
by checking their min/max statistics
on the clustering column.
@@ -60,7 +77,7 @@ on the clustering column.
|--------|-------------|
| `pk-clustering-override` | `true` |
| `clustering.columns` | Must be set (one or more non-primary-key columns) |
-| `deletion-vectors.enabled` | Must be `true` |
+| `deletion-vectors.enabled` | Must be `true` (not required for `first-row`
merge engine) |
| `merge-engine` | `deduplicate` (default) or `first-row` only |
## When to Use
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 1e9b02b32b..9aac4ba08e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -43,8 +43,10 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
@@ -204,10 +206,14 @@ public class ClusteringCompactManager extends
CompactFutureManager {
// Snapshot sorted files before Phase 1 to avoid including newly
created files in Phase 2
List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
for (DataFileMeta file : unsortedFiles) {
+ Set<String> originalFileNames =
Collections.singleton(file.fileName());
List<DataFileMeta> sortedFiles =
fileRewriter.sortAndRewriteFiles(
- singletonList(file), kvSerializer, kvSchemaType);
- keyIndex.updateIndex(file, sortedFiles);
+ singletonList(file),
+ kvSerializer,
+ kvSchemaType,
+ keyIndex,
+ originalFileNames);
result.before().add(file);
result.after().addAll(sortedFiles);
}
@@ -232,19 +238,23 @@ public class ClusteringCompactManager extends
CompactFutureManager {
keyIndex.rebuildIndex(newFile);
}
// Remove stale deletion vectors for merged-away files
- for (DataFileMeta file : mergeGroup) {
- dvMaintainer.removeDeletionVectorOf(file.fileName());
+ if (dvMaintainer != null) {
+ for (DataFileMeta file : mergeGroup) {
+ dvMaintainer.removeDeletionVectorOf(file.fileName());
+ }
}
result.before().addAll(mergeGroup);
result.after().addAll(mergedFiles);
}
}
- CompactDeletionFile deletionFile =
- lazyGenDeletionFile
- ? CompactDeletionFile.lazyGeneration(dvMaintainer)
- : CompactDeletionFile.generateFiles(dvMaintainer);
- result.setDeletionFile(deletionFile);
+ if (dvMaintainer != null) {
+ CompactDeletionFile deletionFile =
+ lazyGenDeletionFile
+ ? CompactDeletionFile.lazyGeneration(dvMaintainer)
+ : CompactDeletionFile.generateFiles(dvMaintainer);
+ result.setDeletionFile(deletionFile);
+ }
return result;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index ece4813764..3689e70483 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.disk.ChannelReaderInputView;
import org.apache.paimon.disk.ChannelReaderInputViewIterator;
import org.apache.paimon.disk.ChannelWithMeta;
@@ -47,6 +48,8 @@ import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
import org.apache.paimon.utils.MutableObjectIterator;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -54,6 +57,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.Set;
/**
* Handles file rewriting for clustering compaction, including sorting
unsorted files (Phase 1) and
@@ -112,10 +116,19 @@ public class ClusteringFileRewriter {
/**
* Sort and rewrite unsorted files by clustering columns. Reads all
KeyValue records, sorts them
- * using an external sort buffer, and writes to new level-1 files.
+ * using an external sort buffer, and writes to new level-1 files. Checks
the key index inline
+ * during writing to handle deduplication (FIRST_ROW skips duplicates,
DEDUPLICATE marks old
+ * positions in DV) and updates the index without re-reading the output
files.
+ *
+ * @param keyIndex the key index for inline checking and batch update, or
null to skip
+ * @param originalFileNames file names of the original files being
replaced (for index check)
*/
public List<DataFileMeta> sortAndRewriteFiles(
- List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer,
RowType kvSchemaType)
+ List<DataFileMeta> inputFiles,
+ KeyValueSerializer kvSerializer,
+ RowType kvSchemaType,
+ @Nullable ClusteringKeyIndex keyIndex,
+ Set<String> originalFileNames)
throws Exception {
int[] sortFieldsInKeyValue =
Arrays.stream(clusteringColumns)
@@ -145,6 +158,10 @@ public class ClusteringFileRewriter {
}
}
+ RowCompactedSerializer keySerializer =
+ keyIndex != null ? new RowCompactedSerializer(keyType) : null;
+ List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() :
null;
+
RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingClusteringFileWriter();
try {
@@ -152,10 +169,18 @@ public class ClusteringFileRewriter {
BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount());
while ((binaryRow = sortedIterator.next(binaryRow)) != null) {
KeyValue kv = kvSerializer.fromRow(binaryRow);
- writer.write(
+ KeyValue copied =
kv.copy(
new InternalRowSerializer(keyType),
- new InternalRowSerializer(valueType)));
+ new InternalRowSerializer(valueType));
+ if (keyIndex != null) {
+ byte[] keyBytes =
keySerializer.serializeToBytes(copied.key());
+ if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
+ continue;
+ }
+ collectedKeys.add(keyBytes);
+ }
+ writer.write(copied);
}
} finally {
sortBuffer.clear();
@@ -170,6 +195,16 @@ public class ClusteringFileRewriter {
fileLevels.addNewFile(newFile);
}
+ // Batch update index using collected keys, split by file rowCount
+ if (keyIndex != null) {
+ int offset = 0;
+ for (DataFileMeta newFile : newFiles) {
+ int count = (int) newFile.rowCount();
+ keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset,
offset + count));
+ offset += count;
+ }
+ }
+
return newFiles;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
index d7234345f6..2b4e412727 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -45,8 +45,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -209,61 +207,47 @@ public class ClusteringKeyIndex implements Closeable {
}
/**
- * Update the key index after a single original file is replaced by new
sorted files.
+ * Check a key against the index during sort-and-rewrite writing.
*
- * <p>For DEDUPLICATE mode: mark the old position in deletion vectors,
keep the new position.
+ * <p>For FIRST_ROW mode: if key exists pointing to a non-original file,
return false (skip
+ * writing this record — it's a duplicate).
*
- * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion
vectors (keep the
- * first/old one); if key is new, store the new position.
+ * <p>For DEDUPLICATE mode: if key exists pointing to a non-original file,
mark the old position
+ * in deletion vectors, return true (write the new record).
+ *
+ * @param keyBytes serialized key bytes
+ * @param originalFileNames file names of the original unsorted files
being replaced
+ * @return true if the record should be written, false to skip (FIRST_ROW
dedup)
*/
- public void updateIndex(DataFileMeta originalFile, List<DataFileMeta>
newSortedFiles)
- throws Exception {
- updateIndex(Collections.singletonList(originalFile), newSortedFiles);
+ public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames)
throws Exception {
+ byte[] oldValue = kvDb.get(keyBytes);
+ if (oldValue != null) {
+ ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
+ int oldFileId = decodeInt(valueIn);
+ int oldPosition = decodeInt(valueIn);
+ DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
+ if (oldFile != null &&
!originalFileNames.contains(oldFile.fileName())) {
+ if (firstRow) {
+ return false;
+ } else {
+ dvMaintainer.notifyNewDeletion(oldFile.fileName(),
oldPosition);
+ }
+ }
+ }
+ return true;
}
/**
- * Update the key index after multiple original files are replaced by new
sorted files.
- *
- * @see #updateIndex(DataFileMeta, List)
+ * Batch update the key index for a new sorted file using pre-collected
key bytes. Avoids
+ * re-reading the file.
*/
- public void updateIndex(List<DataFileMeta> originalFiles,
List<DataFileMeta> newSortedFiles)
- throws Exception {
- RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
-
- Set<String> originalFileNames = new HashSet<>();
- for (DataFileMeta file : originalFiles) {
- originalFileNames.add(file.fileName());
- }
-
- for (DataFileMeta sortedFile : newSortedFiles) {
- int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
- int position = 0;
- try (CloseableIterator<InternalRow> iterator =
readKeyIterator(sortedFile)) {
- while (iterator.hasNext()) {
- byte[] key =
keySerializer.serializeToBytes(iterator.next());
- byte[] oldValue = kvDb.get(key);
- if (oldValue != null) {
- ByteArrayInputStream valueIn = new
ByteArrayInputStream(oldValue);
- int oldFileId = decodeInt(valueIn);
- int oldPosition = decodeInt(valueIn);
- DataFileMeta oldFile =
fileLevels.getFileById(oldFileId);
- if (oldFile != null &&
!originalFileNames.contains(oldFile.fileName())) {
- if (firstRow) {
-
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
- position++;
- continue;
- } else {
-
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
- }
- }
- }
- ByteArrayOutputStream value = new ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(key, value.toByteArray());
- position++;
- }
- }
+ public void batchPutIndex(DataFileMeta sortedFile, List<byte[]>
keyBytesList) throws Exception {
+ int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
+ for (int position = 0; position < keyBytesList.size(); position++) {
+ ByteArrayOutputStream value = new ByteArrayOutputStream(8);
+ encodeInt(value, fileId);
+ encodeInt(value, position);
+ kvDb.put(keyBytesList.get(position), value.toByteArray());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 2ff1080c4a..271709c47e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -522,7 +522,7 @@ public class SchemaValidation {
|| options.changelogProducer() ==
ChangelogProducer.LOOKUP,
"Deletion vectors mode is only supported for NONE/INPUT/LOOKUP
changelog producer now.");
- // pk-clustering-override mode requires deletion vectors even for
first-row
+ // pk-clustering-override mode allows deletion vectors for first-row
if (!options.pkClusteringOverride()) {
checkArgument(
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
@@ -847,7 +847,8 @@ public class SchemaValidation {
throw new IllegalArgumentException(
"Cannot support 'pk-clustering-override' mode without
'clustering.columns'.");
}
- if (!options.deletionVectorsEnabled()) {
+ if (!options.deletionVectorsEnabled()
+ && options.mergeEngine() !=
CoreOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
"Cannot support deletion-vectors disabled in
'pk-clustering-override' mode.");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index e9ddc9e000..dc1aa5febe 100644
---
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -519,6 +520,22 @@ class ClusteringTableTest {
.containsExactlyInAnyOrder(GenericRow.of(1, 100),
GenericRow.of(2, 200));
}
+ /** Test first-row mode without explicit deletion-vectors enabled. */
+ @Test
+ public void testFirstRowWithoutDeletionVectors() throws Exception {
+ Table firstRowTable = createFirstRowTableWithoutDv();
+
+ // Write initial data
+ writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 100),
GenericRow.of(2, 200)));
+
+ // Write same keys with different values - should be ignored
(first-row keeps first)
+ writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 999),
GenericRow.of(2, 888)));
+
+ // Should still see the first values
+ assertThat(readRows(firstRowTable))
+ .containsExactlyInAnyOrder(GenericRow.of(1, 100),
GenericRow.of(2, 200));
+ }
+
/** Test first-row mode with multiple commits. */
@Test
public void testFirstRowMultipleCommits() throws Exception {
@@ -617,6 +634,59 @@ class ClusteringTableTest {
.containsExactlyInAnyOrder(GenericRow.of(1, 10),
GenericRow.of(2, 20));
}
+ /**
+ * Test that FIRST_ROW inline dedup actually reduces the number of records
written. Duplicate
+ * keys should be dropped during sort-and-rewrite, resulting in fewer
total rows across data
+ * files compared to the number of rows written.
+ */
+ @Test
+ public void testFirstRowInlineDedupReducesFileRows() throws Exception {
+ Table firstRowTable = createFirstRowTable();
+
+ // Commit 1: write 5 unique keys
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(1, 10),
+ GenericRow.of(2, 20),
+ GenericRow.of(3, 30),
+ GenericRow.of(4, 40),
+ GenericRow.of(5, 50)));
+
+ // Commit 2: write 5 duplicate keys (all should be dropped inline)
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(1, 99),
+ GenericRow.of(2, 99),
+ GenericRow.of(3, 99),
+ GenericRow.of(4, 99),
+ GenericRow.of(5, 99)));
+
+ // Verify correctness: still see first values
+ assertThat(readRows(firstRowTable))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 10),
+ GenericRow.of(2, 20),
+ GenericRow.of(3, 30),
+ GenericRow.of(4, 40),
+ GenericRow.of(5, 50));
+
+ // Verify optimization: total row count across all data files should
be exactly 5
+ // (duplicates dropped during writing, not just DV-marked)
+ List<Split> splits =
firstRowTable.newReadBuilder().newScan().plan().splits();
+ long totalRows =
+ splits.stream()
+ .mapToLong(
+ split ->
+ ((DataSplit) split)
+ .dataFiles().stream()
+
.mapToLong(DataFileMeta::rowCount)
+ .sum())
+ .sum();
+ assertThat(totalRows).isEqualTo(5);
+ }
+
/** Test first-row mode with many writes to trigger compaction. */
@Test
public void testFirstRowManyWrites() throws Exception {
@@ -915,6 +985,22 @@ class ClusteringTableTest {
return catalog.getTable(identifier);
}
+ private Table createFirstRowTableWithoutDv() throws Exception {
+ Identifier identifier = Identifier.create("default",
"first_row_no_dv_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .primaryKey("a")
+ .option(BUCKET.key(), "1")
+ .option(CLUSTERING_COLUMNS.key(), "b")
+ .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+ .option(MERGE_ENGINE.key(), "first-row")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
private void writeRows(List<GenericRow> rows) throws Exception {
writeRows(table, rows);
}