This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff7a540db [flink] Add local-sort mode for incremental clustering
compaction (#7466)
3ff7a540db is described below
commit 3ff7a540dbf288ac7e3c58323e55cbc3a2abd957
Author: sanshi <[email protected]>
AuthorDate: Fri Mar 20 12:12:07 2026 +0800
[flink] Add local-sort mode for incremental clustering compaction (#7466)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 53 +++++++
.../append/cluster/IncrementalClusterManager.java | 4 +
.../flink/compact/IncrementalClusterCompact.java | 171 +++++++++++++++++++--
.../action/IncrementalClusterActionITCase.java | 163 ++++++++++++++++++++
5 files changed, 380 insertions(+), 17 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 278d164d68..e24b4a2795 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -218,6 +218,12 @@ under the License.
<td>Boolean</td>
<td>Whether enable incremental clustering.</td>
</tr>
+ <tr>
+ <td><h5>clustering.incremental.mode</h5></td>
+ <td style="word-wrap: break-word;">global-sort</td>
+ <td><p>Enum</p></td>
+ <td>The sort mode for incremental clustering compaction.
'global-sort' (default) performs a global range shuffle so output files are
globally ordered. 'local-sort' skips the global shuffle and only sorts rows
within each compaction task, producing files that are internally ordered.
'local-sort' is cheaper and sufficient for Parquet lookup optimizations.<br
/><br />Possible values:<ul><li>"global-sort": Perform global range shuffle and
then local sort. Output files are globall [...]
+ </tr>
<tr>
<td><h5>clustering.incremental.optimize-write</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index b500cb8833..78ce3d9367 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2122,6 +2122,17 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether enable perform clustering before write
phase when incremental clustering is enabled.");
+ public static final ConfigOption<ClusteringIncrementalMode>
CLUSTERING_INCREMENTAL_MODE =
+ key("clustering.incremental.mode")
+ .enumType(ClusteringIncrementalMode.class)
+ .defaultValue(ClusteringIncrementalMode.GLOBAL_SORT)
+ .withDescription(
+ "The sort mode for incremental clustering
compaction. "
+ + "'global-sort' (default) performs a
global range shuffle so output files are globally ordered. "
+ + "'local-sort' skips the global shuffle
and only sorts rows within each compaction task, "
+ + "producing files that are internally
ordered. "
+ + "'local-sort' is cheaper and sufficient
for Parquet lookup optimizations.");
+
@Immutable
public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
key("row-tracking.enabled")
@@ -3593,6 +3604,10 @@ public class CoreOptions implements Serializable {
return options.get(CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE);
}
+ public ClusteringIncrementalMode clusteringIncrementalMode() {
+ return options.get(CLUSTERING_INCREMENTAL_MODE);
+ }
+
public boolean bucketClusterEnabled() {
return !bucketAppendOrdered()
&& !deletionVectorsEnabled()
@@ -4352,6 +4367,44 @@ public class CoreOptions implements Serializable {
}
}
+ /** The incremental clustering mode for append table. */
+ public enum ClusteringIncrementalMode implements DescribedEnum {
+ /**
+ * Perform a global range shuffle before sorting, so that rows with
the same key range land
+ * in the same file. This produces globally ordered output but
requires network shuffling.
+ */
+ GLOBAL_SORT(
+ "global-sort",
+ "Perform global range shuffle and then local sort. Output
files are globally ordered but require network shuffling."),
+
+ /**
+ * Sort rows only within each compaction task (no global shuffle).
Every output file is
+ * internally ordered by the clustering columns, which is sufficient
for per-file Parquet
+ * lookup optimizations.
+ */
+ LOCAL_SORT(
+ "local-sort",
+ "Sort rows only within each compaction task without global
shuffle. Every output file is internally ordered.");
+
+ private final String value;
+ private final String description;
+
+ ClusteringIncrementalMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
/** The compact mode for lookup compaction. */
public enum LookupCompactMode {
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 1c8da3c033..f5642f6648 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -319,6 +319,10 @@ public class IncrementalClusterManager {
return clusterCurve;
}
+ public CoreOptions.ClusteringIncrementalMode clusteringIncrementalMode() {
+ return table.coreOptions().clusteringIncrementalMode();
+ }
+
public List<String> clusterKeys() {
return clusterKeys;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
index ee3b68f609..db2230eda6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -20,24 +20,37 @@ package org.apache.paimon.flink.compact;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
import
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.RowAppendTableSink;
+import org.apache.paimon.flink.sorter.SortOperator;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
@@ -52,6 +65,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
/** Compact for incremental clustering. */
public class IncrementalClusterCompact {
@@ -136,23 +150,31 @@ public class IncrementalClusterCompact {
if (sinkParallelism == null) {
sinkParallelism = sourcePair.getLeft().getParallelism();
}
- TableSortInfo sortInfo =
- new TableSortInfo.Builder()
- .setSortColumns(clusterManager.clusterKeys())
- .setSortStrategy(clusterManager.clusterCurve())
- .setSinkParallelism(sinkParallelism)
- .setLocalSampleSize(sinkParallelism *
localSampleMagnification)
- .setGlobalSampleSize(sinkParallelism * 1000)
- .setRangeNumber(sinkParallelism * 10)
- .build();
- DataStream<RowData> sorted =
- TableSorter.getSorter(
- env,
- sourcePair.getLeft(),
- table.coreOptions(),
- table.rowType(),
- sortInfo)
- .sort();
+
+ DataStream<RowData> sorted;
+ CoreOptions.ClusteringIncrementalMode mode =
clusterManager.clusteringIncrementalMode();
+ if (mode == CoreOptions.ClusteringIncrementalMode.LOCAL_SORT) {
+ sorted = localSort(sourcePair.getLeft(), sinkParallelism);
+ } else {
+ // GLOBAL_SORT: range shuffle + local sort (default)
+ TableSortInfo sortInfo =
+ new TableSortInfo.Builder()
+ .setSortColumns(clusterManager.clusterKeys())
+ .setSortStrategy(clusterManager.clusterCurve())
+ .setSinkParallelism(sinkParallelism)
+ .setLocalSampleSize(sinkParallelism *
localSampleMagnification)
+ .setGlobalSampleSize(sinkParallelism * 1000)
+ .setRangeNumber(sinkParallelism * 10)
+ .build();
+ sorted =
+ TableSorter.getSorter(
+ env,
+ sourcePair.getLeft(),
+ table.coreOptions(),
+ table.rowType(),
+ sortInfo)
+ .sort();
+ }
// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when
doWrite()
@@ -187,6 +209,121 @@ public class IncrementalClusterCompact {
return dataStreams;
}
+ /**
+ * Local sort: sort rows within each task independently by the clustering
columns, without any
+ * global range shuffle. Every output file will be internally ordered,
which is required for
+ * Parquet-based lookup optimizations (PIP-42).
+ *
+ * <p>Pipeline: add key prefix → SortOperator → remove key prefix → back
to RowData
+ */
+ private DataStream<RowData> localSort(DataStream<RowData> input, int
sinkParallelism) {
+ CoreOptions options = table.coreOptions();
+ RowType rowType = table.rowType();
+ List<String> clusterKeys = clusterManager.clusterKeys();
+
+ final int[] keyProjectionMap = rowType.projectIndexes(clusterKeys);
+ final RowType keyRowType =
+
addKeyNamePrefix(Projection.of(keyProjectionMap).project(rowType));
+
+ // Build the combined type: [key fields ... | value fields ...]
+ List<org.apache.paimon.types.DataField> combinedFields = new
ArrayList<>();
+ combinedFields.addAll(keyRowType.getFields());
+ combinedFields.addAll(rowType.getFields());
+ final RowType longRowType = new RowType(combinedFields);
+
+ final int valueFieldCount = rowType.getFieldCount();
+ final int[] valueProjectionMap = new int[valueFieldCount];
+ for (int i = 0; i < valueFieldCount; i++) {
+ valueProjectionMap[i] = keyRowType.getFieldCount() + i;
+ }
+
+ // Step 1: prepend key columns to each row: (key, value) -> JoinedRow
+ DataStream<InternalRow> withKey =
+ input.map(
+ new RichMapFunction<RowData, InternalRow>() {
+
+ private transient
org.apache.paimon.codegen.Projection
+ keyProjection;
+
+ /**
+ * Do not annotate with
<code>@override</code> here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext)
throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with
<code>@override</code> here to maintain
+ * compatibility with Flink 2.0+.
+ */
+ public void open(Configuration parameters)
throws Exception {
+ keyProjection =
+ CodeGenUtils.newProjection(
+ rowType,
keyProjectionMap);
+ }
+
+ @Override
+ public InternalRow map(RowData value) {
+ FlinkRowWrapper wrapped = new
FlinkRowWrapper(value);
+ return new JoinedRow(
+
keyProjection.apply(wrapped).copy(), wrapped);
+ }
+ },
+ InternalTypeInfo.fromRowType(longRowType))
+ .setParallelism(input.getParallelism());
+
+ // Step 2: local sort by key columns (no shuffle)
+ DataStream<InternalRow> sortedWithKey =
+ withKey.transform(
+ "LOCAL SORT",
+ InternalTypeInfo.fromRowType(longRowType),
+ new SortOperator(
+ keyRowType,
+ longRowType,
+ options.writeBufferSize(),
+ options.pageSize(),
+ options.localSortMaxNumFileHandles(),
+ options.spillCompressOptions(),
+ sinkParallelism,
+ options.writeBufferSpillDiskSize(),
+
options.sequenceFieldSortOrderIsAscending()))
+ .setParallelism(sinkParallelism);
+
+ // Step 3: remove the prepended key columns and convert back to RowData
+ return sortedWithKey
+ .map(
+ new RichMapFunction<InternalRow, InternalRow>() {
+
+ private transient KeyProjectedRow keyProjectedRow;
+
+ /**
+ * Do not annotate with <code>@override</code>
here to maintain
+ * compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code>
here to maintain
+ * compatibility with Flink 2.0+.
+ */
+ public void open(Configuration parameters) {
+ keyProjectedRow = new
KeyProjectedRow(valueProjectionMap);
+ }
+
+ @Override
+ public InternalRow map(InternalRow value) {
+ return keyProjectedRow.replaceRow(value);
+ }
+ },
+ InternalTypeInfo.fromRowType(rowType))
+ .setParallelism(sinkParallelism)
+ .map(FlinkRowData::new, input.getType())
+ .setParallelism(sinkParallelism);
+ }
+
protected void buildCommitOperator(List<DataStream<Committable>>
dataStreams) {
RowAppendTableSink sink = new RowAppendTableSink(table, null, null);
DataStream<Committable> dataStream = dataStreams.get(0);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 86800b590f..9914bc68d3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -1006,6 +1006,169 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
deletedIndexFiles)));
}
+ @Test
+ public void testLocalSortClusterUnpartitionedTable() throws Exception {
+ // local-sort mode with ORDER strategy: every output file must be
internally ordered
+ Map<String, String> options = new HashMap<>();
+ options.put("bucket", "-1");
+ options.put("num-levels", "6");
+ options.put("num-sorted-run.compaction-trigger", "2");
+ options.put("clustering.columns", "a,b");
+ options.put("clustering.strategy", "order");
+ options.put("clustering.incremental", "true");
+ options.put("clustering.incremental.mode", "local-sort");
+ options.put("scan.parallelism", "1");
+ options.put("sink.parallelism", "1");
+ FileStoreTable table = createTable(null, options);
+
+ List<CommitMessage> messages = new ArrayList<>();
+ // write rows in reverse order so that after sorting they should be
ascending
+ for (int i = 2; i >= 0; i--) {
+ for (int j = 2; j >= 0; j--) {
+ messages.addAll(write(GenericRow.of(i, j, null, 0)));
+ }
+ }
+ commit(messages);
+
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1});
+ List<String> beforeCluster =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ // before clustering: data is in write order (descending)
+ assertThat(beforeCluster)
+ .containsExactlyElementsOf(
+ Lists.newArrayList(
+ "+I[2, 2]",
+ "+I[2, 1]",
+ "+I[2, 0]",
+ "+I[1, 2]",
+ "+I[1, 1]",
+ "+I[1, 0]",
+ "+I[0, 2]",
+ "+I[0, 1]",
+ "+I[0, 0]"));
+
+ // run incremental clustering with local-sort
+ runAction(
+ Lists.newArrayList(
+ "--table_conf",
"clustering.incremental.mode=local-sort",
+ "--table_conf", "clustering.strategy=order"));
+ checkSnapshot(table);
+
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+
+ // after local-sort clustering: all data present (order not globally
guaranteed,
+ // but within each file data must be sorted ascending by a, b)
+ List<String> afterCluster =
+ getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ assertThat(afterCluster)
+ .containsExactlyInAnyOrder(
+ "+I[0, 0]",
+ "+I[0, 1]",
+ "+I[0, 2]",
+ "+I[1, 0]",
+ "+I[1, 1]",
+ "+I[1, 2]",
+ "+I[2, 0]",
+ "+I[2, 1]",
+ "+I[2, 2]");
+
+ // verify internal order: within the single output file, rows must be
+ // sorted ascending by (a, b) since parallelism=1 guarantees all data
is in one task
+ assertThat(afterCluster)
+ .containsExactlyElementsOf(
+ Lists.newArrayList(
+ "+I[0, 0]",
+ "+I[0, 1]",
+ "+I[0, 2]",
+ "+I[1, 0]",
+ "+I[1, 1]",
+ "+I[1, 2]",
+ "+I[2, 0]",
+ "+I[2, 1]",
+ "+I[2, 2]"));
+ }
+
+ @Test
+ public void testLocalSortClusterPartitionedTable() throws Exception {
+ // local-sort mode with ORDER strategy for partitioned table
+ Map<String, String> options = new HashMap<>();
+ options.put("bucket", "-1");
+ options.put("num-levels", "6");
+ options.put("num-sorted-run.compaction-trigger", "2");
+ options.put("scan.plan-sort-partition", "true");
+ options.put("clustering.columns", "a,b");
+ options.put("clustering.strategy", "order");
+ options.put("clustering.incremental", "true");
+ options.put("clustering.incremental.mode", "local-sort");
+ options.put("scan.parallelism", "1");
+ options.put("sink.parallelism", "1");
+ FileStoreTable table = createTable("pt", options);
+
+ List<CommitMessage> messages = new ArrayList<>();
+ for (int pt = 0; pt < 2; pt++) {
+ // write in reverse order within each partition
+ for (int i = 2; i >= 0; i--) {
+ for (int j = 2; j >= 0; j--) {
+ messages.addAll(write(GenericRow.of(i, j, null, pt)));
+ }
+ }
+ }
+ commit(messages);
+
+ // run incremental clustering with local-sort
+ runAction(
+ Lists.newArrayList(
+ "--table_conf",
"clustering.incremental.mode=local-sort",
+ "--table_conf", "clustering.strategy=order"));
+ checkSnapshot(table);
+
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1, 3});
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(2);
+
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(1);
+ assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5);
+ }
+
+ // both partitions have all 9 rows, sorted within each partition
+ List<String> result = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ assertThat(result).hasSize(18);
+ // data correctness: all rows present
+ for (int pt = 0; pt < 2; pt++) {
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ assertThat(result).contains(String.format("+I[%s, %s,
%s]", i, j, pt));
+ }
+ }
+ }
+ // each partition's rows should be in sorted order (parallelism=1, one
task per partition)
+ for (int pt = 0; pt < 2; pt++) {
+ final int finalPt = pt;
+ List<String> partitionRows =
+ result.stream()
+ .filter(r -> r.endsWith(", " + finalPt + "]"))
+ .collect(Collectors.toList());
+ assertThat(partitionRows)
+ .containsExactly(
+ String.format("+I[0, 0, %s]", pt),
+ String.format("+I[0, 1, %s]", pt),
+ String.format("+I[0, 2, %s]", pt),
+ String.format("+I[1, 0, %s]", pt),
+ String.format("+I[1, 1, %s]", pt),
+ String.format("+I[1, 2, %s]", pt),
+ String.format("+I[2, 0, %s]", pt),
+ String.format("+I[2, 1, %s]", pt),
+ String.format("+I[2, 2, %s]", pt));
+ }
+ }
+
private void runAction(List<String> extra) throws Exception {
runAction(false, extra);
}