This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 16ecc13f40 [core] Fix frequent compact triggers during streaming
writes for pk clustering override (#7540)
16ecc13f40 is described below
commit 16ecc13f40e9f58954d9235969a4fab0ef860a9e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 26 22:17:27 2026 +0800
[core] Fix frequent compact triggers during streaming writes for pk
clustering override (#7540)
---
.../clustering/ClusteringCompactManager.java | 3 +
.../paimon/separated/ClusteringTableTest.java | 76 ++++++++++++++++++++++
2 files changed, 79 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 3e288b22ca..1e9b02b32b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -180,6 +180,9 @@ public class ClusteringCompactManager extends
CompactFutureManager {
@Override
public void triggerCompaction(boolean fullCompaction) {
+ if (taskFuture != null) {
+ return;
+ }
taskFuture =
executor.submit(
new CompactTask(metricsReporter) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index e33afb7927..e96a18cdd6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
@@ -35,6 +36,9 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
@@ -787,6 +791,78 @@ class ClusteringTableTest {
GenericRow.of(4, 44));
}
+ // ==================== Stream Write with Frequent Compact Tests
====================
+
+ /**
+ * Test that frequent compact triggers during streaming writes don't
corrupt data. This
+ * exercises the {@code if (taskFuture != null) return} guard in {@code
+ * ClusteringCompactManager.triggerCompaction}.
+ */
+ @Test
+ public void testStreamWriteFrequentCompact() throws Exception {
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+ try (StreamTableWrite write =
+ (StreamTableWrite)
writeBuilder.newWrite().withIOManager(ioManager);
+ StreamTableCommit commit = writeBuilder.newCommit()) {
+ long commitId = 0;
+ for (int round = 0; round < 20; round++) {
+ write.write(GenericRow.of(round, round * 10));
+ // Trigger compact on every round for both buckets
+ write.compact(BinaryRow.EMPTY_ROW, 0, false);
+ write.compact(BinaryRow.EMPTY_ROW, 1, false);
+ commit.commit(commitId, write.prepareCommit(false, commitId));
+ commitId++;
+ }
+
+ // Final commit with waitCompaction to ensure all compactions
finish
+ commit.commit(commitId, write.prepareCommit(true, commitId));
+
+ // All 20 distinct keys should be present
+ List<GenericRow> expected = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ expected.add(GenericRow.of(i, i * 10));
+ }
+
assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+ }
+ }
+
+ /**
+ * Test streaming writes with overlapping keys and frequent compaction.
The {@code taskFuture !=
+ * null} guard should safely skip redundant compaction triggers while
still producing correct
+ * deduplicated results.
+ */
+ @Test
+ public void testStreamWriteFrequentCompactWithOverlappingKeys() throws
Exception {
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+ try (StreamTableWrite write =
+ (StreamTableWrite)
writeBuilder.newWrite().withIOManager(ioManager);
+ StreamTableCommit commit = writeBuilder.newCommit()) {
+ long commitId = 0;
+ for (int round = 0; round < 20; round++) {
+ // Write overlapping keys: keys 0-4 are updated every round
+ for (int i = 0; i < 5; i++) {
+ write.write(GenericRow.of(i, round * 100 + i));
+ }
+ write.compact(BinaryRow.EMPTY_ROW, 0, false);
+ write.compact(BinaryRow.EMPTY_ROW, 1, false);
+ commit.commit(commitId, write.prepareCommit(false, commitId));
+ commitId++;
+ }
+
+ // Final wait for compaction
+ commit.commit(commitId, write.prepareCommit(true, commitId));
+
+ // Should see only the latest values from round 19
+ assertThat(readRows())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(0, 1900),
+ GenericRow.of(1, 1901),
+ GenericRow.of(2, 1902),
+ GenericRow.of(3, 1903),
+ GenericRow.of(4, 1904));
+ }
+ }
+
private Table createFirstRowTableWithLowSpillThreshold() throws Exception {
Identifier identifier = Identifier.create("default",
"first_row_spill_table");
Schema schema =