This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff7a540db [flink] Add local-sort mode for incremental clustering 
compaction (#7466)
3ff7a540db is described below

commit 3ff7a540dbf288ac7e3c58323e55cbc3a2abd957
Author: sanshi <[email protected]>
AuthorDate: Fri Mar 20 12:12:07 2026 +0800

    [flink] Add local-sort mode for incremental clustering compaction (#7466)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  53 +++++++
 .../append/cluster/IncrementalClusterManager.java  |   4 +
 .../flink/compact/IncrementalClusterCompact.java   | 171 +++++++++++++++++++--
 .../action/IncrementalClusterActionITCase.java     | 163 ++++++++++++++++++++
 5 files changed, 380 insertions(+), 17 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 278d164d68..e24b4a2795 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -218,6 +218,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether enable incremental clustering.</td>
         </tr>
+        <tr>
+            <td><h5>clustering.incremental.mode</h5></td>
+            <td style="word-wrap: break-word;">global-sort</td>
+            <td><p>Enum</p></td>
+            <td>The sort mode for incremental clustering compaction. 
'global-sort' (default) performs a global range shuffle so output files are 
globally ordered. 'local-sort' skips the global shuffle and only sorts rows 
within each compaction task, producing files that are internally ordered. 
'local-sort' is cheaper and sufficient for Parquet lookup optimizations.<br 
/><br />Possible values:<ul><li>"global-sort": Perform global range shuffle and 
then local sort. Output files are globall [...]
+        </tr>
         <tr>
             <td><h5>clustering.incremental.optimize-write</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index b500cb8833..78ce3d9367 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2122,6 +2122,17 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether enable perform clustering before write 
phase when incremental clustering is enabled.");
 
+    public static final ConfigOption<ClusteringIncrementalMode> 
CLUSTERING_INCREMENTAL_MODE =
+            key("clustering.incremental.mode")
+                    .enumType(ClusteringIncrementalMode.class)
+                    .defaultValue(ClusteringIncrementalMode.GLOBAL_SORT)
+                    .withDescription(
+                            "The sort mode for incremental clustering 
compaction. "
+                                    + "'global-sort' (default) performs a 
global range shuffle so output files are globally ordered. "
+                                    + "'local-sort' skips the global shuffle 
and only sorts rows within each compaction task, "
+                                    + "producing files that are internally 
ordered. "
+                                    + "'local-sort' is cheaper and sufficient 
for Parquet lookup optimizations.");
+
     @Immutable
     public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
             key("row-tracking.enabled")
@@ -3593,6 +3604,10 @@ public class CoreOptions implements Serializable {
         return options.get(CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE);
     }
 
+    public ClusteringIncrementalMode clusteringIncrementalMode() {
+        return options.get(CLUSTERING_INCREMENTAL_MODE);
+    }
+
     public boolean bucketClusterEnabled() {
         return !bucketAppendOrdered()
                 && !deletionVectorsEnabled()
@@ -4352,6 +4367,44 @@ public class CoreOptions implements Serializable {
         }
     }
 
+    /** The incremental clustering mode for append table. */
+    public enum ClusteringIncrementalMode implements DescribedEnum {
+        /**
+         * Perform a global range shuffle before sorting, so that rows with 
the same key range land
+         * in the same file. This produces globally ordered output but 
requires network shuffling.
+         */
+        GLOBAL_SORT(
+                "global-sort",
+                "Perform global range shuffle and then local sort. Output 
files are globally ordered but require network shuffling."),
+
+        /**
+         * Sort rows only within each compaction task (no global shuffle). 
Every output file is
+         * internally ordered by the clustering columns, which is sufficient 
for per-file Parquet
+         * lookup optimizations.
+         */
+        LOCAL_SORT(
+                "local-sort",
+                "Sort rows only within each compaction task without global 
shuffle. Every output file is internally ordered.");
+
+        private final String value;
+        private final String description;
+
+        ClusteringIncrementalMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+
     /** The compact mode for lookup compaction. */
     public enum LookupCompactMode {
         /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 1c8da3c033..f5642f6648 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -319,6 +319,10 @@ public class IncrementalClusterManager {
         return clusterCurve;
     }
 
+    public CoreOptions.ClusteringIncrementalMode clusteringIncrementalMode() {
+        return table.coreOptions().clusteringIncrementalMode();
+    }
+
     public List<String> clusterKeys() {
         return clusterKeys;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
index ee3b68f609..db2230eda6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -20,24 +20,37 @@ package org.apache.paimon.flink.compact;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
 import 
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.CommittableTypeInfo;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.sink.RowAppendTableSink;
+import org.apache.paimon.flink.sorter.SortOperator;
 import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.KeyProjectedRow;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
 
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
@@ -52,6 +65,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
 
 /** Compact for incremental clustering. */
 public class IncrementalClusterCompact {
@@ -136,23 +150,31 @@ public class IncrementalClusterCompact {
         if (sinkParallelism == null) {
             sinkParallelism = sourcePair.getLeft().getParallelism();
         }
-        TableSortInfo sortInfo =
-                new TableSortInfo.Builder()
-                        .setSortColumns(clusterManager.clusterKeys())
-                        .setSortStrategy(clusterManager.clusterCurve())
-                        .setSinkParallelism(sinkParallelism)
-                        .setLocalSampleSize(sinkParallelism * 
localSampleMagnification)
-                        .setGlobalSampleSize(sinkParallelism * 1000)
-                        .setRangeNumber(sinkParallelism * 10)
-                        .build();
-        DataStream<RowData> sorted =
-                TableSorter.getSorter(
-                                env,
-                                sourcePair.getLeft(),
-                                table.coreOptions(),
-                                table.rowType(),
-                                sortInfo)
-                        .sort();
+
+        DataStream<RowData> sorted;
+        CoreOptions.ClusteringIncrementalMode mode = 
clusterManager.clusteringIncrementalMode();
+        if (mode == CoreOptions.ClusteringIncrementalMode.LOCAL_SORT) {
+            sorted = localSort(sourcePair.getLeft(), sinkParallelism);
+        } else {
+            // GLOBAL_SORT: range shuffle + local sort (default)
+            TableSortInfo sortInfo =
+                    new TableSortInfo.Builder()
+                            .setSortColumns(clusterManager.clusterKeys())
+                            .setSortStrategy(clusterManager.clusterCurve())
+                            .setSinkParallelism(sinkParallelism)
+                            .setLocalSampleSize(sinkParallelism * 
localSampleMagnification)
+                            .setGlobalSampleSize(sinkParallelism * 1000)
+                            .setRangeNumber(sinkParallelism * 10)
+                            .build();
+            sorted =
+                    TableSorter.getSorter(
+                                    env,
+                                    sourcePair.getLeft(),
+                                    table.coreOptions(),
+                                    table.rowType(),
+                                    sortInfo)
+                            .sort();
+        }
 
         // 2.3 write and then reorganize the committable
         // set parallelism to null, and it'll forward parallelism when 
doWrite()
@@ -187,6 +209,121 @@ public class IncrementalClusterCompact {
         return dataStreams;
     }
 
+    /**
+     * Local sort: sort rows within each task independently by the clustering 
columns, without any
+     * global range shuffle. Every output file will be internally ordered, 
which is required for
+     * Parquet-based lookup optimizations (PIP-42).
+     *
+     * <p>Pipeline: add key prefix → SortOperator → remove key prefix → back 
to RowData
+     */
+    private DataStream<RowData> localSort(DataStream<RowData> input, int 
sinkParallelism) {
+        CoreOptions options = table.coreOptions();
+        RowType rowType = table.rowType();
+        List<String> clusterKeys = clusterManager.clusterKeys();
+
+        final int[] keyProjectionMap = rowType.projectIndexes(clusterKeys);
+        final RowType keyRowType =
+                
addKeyNamePrefix(Projection.of(keyProjectionMap).project(rowType));
+
+        // Build the combined type: [key fields ... | value fields ...]
+        List<org.apache.paimon.types.DataField> combinedFields = new 
ArrayList<>();
+        combinedFields.addAll(keyRowType.getFields());
+        combinedFields.addAll(rowType.getFields());
+        final RowType longRowType = new RowType(combinedFields);
+
+        final int valueFieldCount = rowType.getFieldCount();
+        final int[] valueProjectionMap = new int[valueFieldCount];
+        for (int i = 0; i < valueFieldCount; i++) {
+            valueProjectionMap[i] = keyRowType.getFieldCount() + i;
+        }
+
+        // Step 1: prepend key columns to each row: (key, value) -> JoinedRow
+        DataStream<InternalRow> withKey =
+                input.map(
+                                new RichMapFunction<RowData, InternalRow>() {
+
+                                    private transient 
org.apache.paimon.codegen.Projection
+                                            keyProjection;
+
+                                    /**
+                                     * Do not annotate with 
<code>@override</code> here to maintain
+                                     * compatibility with Flink 1.18-.
+                                     */
+                                    public void open(OpenContext openContext) 
throws Exception {
+                                        open(new Configuration());
+                                    }
+
+                                    /**
+                                     * Do not annotate with 
<code>@override</code> here to maintain
+                                     * compatibility with Flink 2.0+.
+                                     */
+                                    public void open(Configuration parameters) 
throws Exception {
+                                        keyProjection =
+                                                CodeGenUtils.newProjection(
+                                                        rowType, 
keyProjectionMap);
+                                    }
+
+                                    @Override
+                                    public InternalRow map(RowData value) {
+                                        FlinkRowWrapper wrapped = new 
FlinkRowWrapper(value);
+                                        return new JoinedRow(
+                                                
keyProjection.apply(wrapped).copy(), wrapped);
+                                    }
+                                },
+                                InternalTypeInfo.fromRowType(longRowType))
+                        .setParallelism(input.getParallelism());
+
+        // Step 2: local sort by key columns (no shuffle)
+        DataStream<InternalRow> sortedWithKey =
+                withKey.transform(
+                                "LOCAL SORT",
+                                InternalTypeInfo.fromRowType(longRowType),
+                                new SortOperator(
+                                        keyRowType,
+                                        longRowType,
+                                        options.writeBufferSize(),
+                                        options.pageSize(),
+                                        options.localSortMaxNumFileHandles(),
+                                        options.spillCompressOptions(),
+                                        sinkParallelism,
+                                        options.writeBufferSpillDiskSize(),
+                                        
options.sequenceFieldSortOrderIsAscending()))
+                        .setParallelism(sinkParallelism);
+
+        // Step 3: remove the prepended key columns and convert back to RowData
+        return sortedWithKey
+                .map(
+                        new RichMapFunction<InternalRow, InternalRow>() {
+
+                            private transient KeyProjectedRow keyProjectedRow;
+
+                            /**
+                             * Do not annotate with <code>@override</code> 
here to maintain
+                             * compatibility with Flink 1.18-.
+                             */
+                            public void open(OpenContext openContext) {
+                                open(new Configuration());
+                            }
+
+                            /**
+                             * Do not annotate with <code>@override</code> 
here to maintain
+                             * compatibility with Flink 2.0+.
+                             */
+                            public void open(Configuration parameters) {
+                                keyProjectedRow = new 
KeyProjectedRow(valueProjectionMap);
+                            }
+
+                            @Override
+                            public InternalRow map(InternalRow value) {
+                                return keyProjectedRow.replaceRow(value);
+                            }
+                        },
+                        InternalTypeInfo.fromRowType(rowType))
+                .setParallelism(sinkParallelism)
+                .map(FlinkRowData::new, input.getType())
+                .setParallelism(sinkParallelism);
+    }
+
     protected void buildCommitOperator(List<DataStream<Committable>> 
dataStreams) {
         RowAppendTableSink sink = new RowAppendTableSink(table, null, null);
         DataStream<Committable> dataStream = dataStreams.get(0);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 86800b590f..9914bc68d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -1006,6 +1006,169 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
                                 deletedIndexFiles)));
     }
 
+    @Test
+    public void testLocalSortClusterUnpartitionedTable() throws Exception {
+        // local-sort mode with ORDER strategy: every output file must be 
internally ordered
+        Map<String, String> options = new HashMap<>();
+        options.put("bucket", "-1");
+        options.put("num-levels", "6");
+        options.put("num-sorted-run.compaction-trigger", "2");
+        options.put("clustering.columns", "a,b");
+        options.put("clustering.strategy", "order");
+        options.put("clustering.incremental", "true");
+        options.put("clustering.incremental.mode", "local-sort");
+        options.put("scan.parallelism", "1");
+        options.put("sink.parallelism", "1");
+        FileStoreTable table = createTable(null, options);
+
+        List<CommitMessage> messages = new ArrayList<>();
+        // write rows in reverse order so that after sorting they should be 
ascending
+        for (int i = 2; i >= 0; i--) {
+            for (int j = 2; j >= 0; j--) {
+                messages.addAll(write(GenericRow.of(i, j, null, 0)));
+            }
+        }
+        commit(messages);
+
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1});
+        List<String> beforeCluster =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        // before clustering: data is in write order (descending)
+        assertThat(beforeCluster)
+                .containsExactlyElementsOf(
+                        Lists.newArrayList(
+                                "+I[2, 2]",
+                                "+I[2, 1]",
+                                "+I[2, 0]",
+                                "+I[1, 2]",
+                                "+I[1, 1]",
+                                "+I[1, 0]",
+                                "+I[0, 2]",
+                                "+I[0, 1]",
+                                "+I[0, 0]"));
+
+        // run incremental clustering with local-sort
+        runAction(
+                Lists.newArrayList(
+                        "--table_conf", 
"clustering.incremental.mode=local-sort",
+                        "--table_conf", "clustering.strategy=order"));
+        checkSnapshot(table);
+
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+
+        // after local-sort clustering: all data present (order not globally 
guaranteed,
+        // but within each file data must be sorted ascending by a, b)
+        List<String> afterCluster =
+                getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        assertThat(afterCluster)
+                .containsExactlyInAnyOrder(
+                        "+I[0, 0]",
+                        "+I[0, 1]",
+                        "+I[0, 2]",
+                        "+I[1, 0]",
+                        "+I[1, 1]",
+                        "+I[1, 2]",
+                        "+I[2, 0]",
+                        "+I[2, 1]",
+                        "+I[2, 2]");
+
+        // verify internal order: within the single output file, rows must be
+        // sorted ascending by (a, b) since parallelism=1 guarantees all data 
is in one task
+        assertThat(afterCluster)
+                .containsExactlyElementsOf(
+                        Lists.newArrayList(
+                                "+I[0, 0]",
+                                "+I[0, 1]",
+                                "+I[0, 2]",
+                                "+I[1, 0]",
+                                "+I[1, 1]",
+                                "+I[1, 2]",
+                                "+I[2, 0]",
+                                "+I[2, 1]",
+                                "+I[2, 2]"));
+    }
+
+    @Test
+    public void testLocalSortClusterPartitionedTable() throws Exception {
+        // local-sort mode with ORDER strategy for partitioned table
+        Map<String, String> options = new HashMap<>();
+        options.put("bucket", "-1");
+        options.put("num-levels", "6");
+        options.put("num-sorted-run.compaction-trigger", "2");
+        options.put("scan.plan-sort-partition", "true");
+        options.put("clustering.columns", "a,b");
+        options.put("clustering.strategy", "order");
+        options.put("clustering.incremental", "true");
+        options.put("clustering.incremental.mode", "local-sort");
+        options.put("scan.parallelism", "1");
+        options.put("sink.parallelism", "1");
+        FileStoreTable table = createTable("pt", options);
+
+        List<CommitMessage> messages = new ArrayList<>();
+        for (int pt = 0; pt < 2; pt++) {
+            // write in reverse order within each partition
+            for (int i = 2; i >= 0; i--) {
+                for (int j = 2; j >= 0; j--) {
+                    messages.addAll(write(GenericRow.of(i, j, null, pt)));
+                }
+            }
+        }
+        commit(messages);
+
+        // run incremental clustering with local-sort
+        runAction(
+                Lists.newArrayList(
+                        "--table_conf", 
"clustering.incremental.mode=local-sort",
+                        "--table_conf", "clustering.strategy=order"));
+        checkSnapshot(table);
+
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1, 3});
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(2);
+
+        for (Split split : splits) {
+            DataSplit dataSplit = (DataSplit) split;
+            assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+            assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5);
+        }
+
+        // both partitions have all 9 rows, sorted within each partition
+        List<String> result = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        assertThat(result).hasSize(18);
+        // data correctness: all rows present
+        for (int pt = 0; pt < 2; pt++) {
+            for (int i = 0; i < 3; i++) {
+                for (int j = 0; j < 3; j++) {
+                    assertThat(result).contains(String.format("+I[%s, %s, 
%s]", i, j, pt));
+                }
+            }
+        }
+        // each partition's rows should be in sorted order (parallelism=1, one 
task per partition)
+        for (int pt = 0; pt < 2; pt++) {
+            final int finalPt = pt;
+            List<String> partitionRows =
+                    result.stream()
+                            .filter(r -> r.endsWith(", " + finalPt + "]"))
+                            .collect(Collectors.toList());
+            assertThat(partitionRows)
+                    .containsExactly(
+                            String.format("+I[0, 0, %s]", pt),
+                            String.format("+I[0, 1, %s]", pt),
+                            String.format("+I[0, 2, %s]", pt),
+                            String.format("+I[1, 0, %s]", pt),
+                            String.format("+I[1, 1, %s]", pt),
+                            String.format("+I[1, 2, %s]", pt),
+                            String.format("+I[2, 0, %s]", pt),
+                            String.format("+I[2, 1, %s]", pt),
+                            String.format("+I[2, 2, %s]", pt));
+        }
+    }
+
     private void runAction(List<String> extra) throws Exception {
         runAction(false, extra);
     }

Reply via email to