This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 8904cc0409b52820a64c6ac4f63986a2d59f6805 Author: Jingsong Lee <[email protected]> AuthorDate: Wed Mar 25 22:29:10 2026 +0800 [core] Add validation for 'pk-clustering-override' (#7528) --- .../primary-key-table/pk-clustering-override.md | 13 ++----- .../ClusteringCompactManagerFactory.java | 28 ++------------- .../org/apache/paimon/schema/SchemaValidation.java | 40 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 37 deletions(-) diff --git a/docs/content/primary-key-table/pk-clustering-override.md b/docs/content/primary-key-table/pk-clustering-override.md index 0c37456a9e..534c3c3785 100644 --- a/docs/content/primary-key-table/pk-clustering-override.md +++ b/docs/content/primary-key-table/pk-clustering-override.md @@ -53,7 +53,7 @@ CREATE TABLE my_table ( After this, data files within each bucket will be physically sorted by `city` instead of `id`. Queries like `SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files by checking their min/max statistics on the clustering column. - +s ## How It Works PK Clustering Override replaces the default LSM compaction with a two-phase clustering compaction: @@ -82,16 +82,6 @@ temporary files to reduce memory consumption, preventing OOM during multi-way me | `clustering.columns` | Must be set (one or more non-primary-key columns) | | `deletion-vectors.enabled` | Must be `true` | | `merge-engine` | `deduplicate` (default) or `first-row` only | -| `sequence.fields` | Must **not** be set | -| `record-level.expire-time` | Must **not** be set | - -## Related Options - -| Option | Default | Description | -|--------|---------|-------------| -| `clustering.columns` | (none) | Comma-separated column names used as the physical sort order for data files. | -| `sort-spill-threshold` | (auto) | When the number of merge readers exceeds this value, smaller files are spilled to row-based temp files to reduce memory usage. | -| `sort-spill-buffer-size` | `64 mb` | Buffer size used for external sort during Phase 1 rewrite. | ## When to Use @@ -106,3 +96,4 @@ It is **not** suitable when: - Point lookups by primary key are the dominant access pattern (default LSM sort is already optimal). - You need `partial-update` or `aggregation` merge engine. - `sequence.fields` or `record-level.expire-time` is required. +- Changelog producer`lookup` or `full-compaction` is required. diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java index b58f5941a0..f0b117df8b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManagerFactory.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.concurrent.ExecutorService; +import static org.apache.paimon.schema.SchemaValidation.validatePkClusteringOverride; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Factory to create {@link ClusteringCompactManager}. */ @@ -66,32 +67,7 @@ public class ClusteringCompactManagerFactory implements KvCompactionManagerFacto this.keyType = keyType; this.valueType = valueType; this.cacheManager = cacheManager; - - if (options.clusteringColumns().isEmpty()) { - throw new IllegalArgumentException( - "Cannot support 'pk-clustering-override' mode without 'clustering.columns'."); - } - if (!options.deletionVectorsEnabled()) { - throw new UnsupportedOperationException( - "Cannot support deletion-vectors disabled in 'pk-clustering-override' mode."); - } - if (options.recordLevelExpireTime() != null) { - throw new UnsupportedOperationException( - "Cannot support record level expire time enabled in 'pk-clustering-override' mode."); - } - if (options.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE - && options.mergeEngine() != CoreOptions.MergeEngine.FIRST_ROW) { - throw new UnsupportedOperationException( - "Cannot support merge engine: " - + options.mergeEngine() - + " in 'pk-clustering-override' mode."); - } - if (!options.sequenceField().isEmpty()) { - throw new UnsupportedOperationException( - "Cannot support sequence field: " - + options.sequenceField() - + " in 'pk-clustering-override' mode."); - } + validatePkClusteringOverride(options); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 689f65276b..130f9b29eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -282,6 +282,8 @@ public class SchemaValidation { validateChainTable(schema, options); validateChangelogReadSequenceNumber(schema, options); + + validatePkClusteringOverride(options); } public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) { @@ -795,4 +797,42 @@ public class SchemaValidation { CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key()); } } + + public static void validatePkClusteringOverride(CoreOptions options) { + if (options.pkClusteringOverride()) { + if (options.clusteringColumns().isEmpty()) { + throw new IllegalArgumentException( + "Cannot support 'pk-clustering-override' mode without 'clustering.columns'."); + } + if (!options.deletionVectorsEnabled()) { + throw new UnsupportedOperationException( + "Cannot support deletion-vectors disabled in 'pk-clustering-override' mode."); + } + if (options.recordLevelExpireTime() != null) { + throw new UnsupportedOperationException( + "Cannot support record level expire time enabled in 'pk-clustering-override' mode."); + } + if (options.mergeEngine() != CoreOptions.MergeEngine.DEDUPLICATE + && options.mergeEngine() != CoreOptions.MergeEngine.FIRST_ROW) { + throw new UnsupportedOperationException( + "Cannot support merge engine: " + + options.mergeEngine() + + " in 'pk-clustering-override' mode."); + } + if (!options.sequenceField().isEmpty()) { + throw new UnsupportedOperationException( + "Cannot support sequence field: " + + options.sequenceField() + + " in 'pk-clustering-override' mode."); + } + ChangelogProducer changelogProducer = options.changelogProducer(); + if (changelogProducer != ChangelogProducer.NONE + && changelogProducer != ChangelogProducer.INPUT) { + throw new UnsupportedOperationException( + "Cannot support changelog producer: " + + changelogProducer + + " in 'pk-clustering-override' mode."); + } + } + } }
