This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 16ecc13f40 [core] Fix frequent compact triggers during streaming 
writes for pk clustering override (#7540)
16ecc13f40 is described below

commit 16ecc13f40e9f58954d9235969a4fab0ef860a9e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 26 22:17:27 2026 +0800

    [core] Fix frequent compact triggers during streaming writes for pk 
clustering override (#7540)
---
 .../clustering/ClusteringCompactManager.java       |  3 +
 .../paimon/separated/ClusteringTableTest.java      | 76 ++++++++++++++++++++++
 2 files changed, 79 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 3e288b22ca..1e9b02b32b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -180,6 +180,9 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
 
     @Override
     public void triggerCompaction(boolean fullCompaction) {
+        if (taskFuture != null) {
+            return;
+        }
         taskFuture =
                 executor.submit(
                         new CompactTask(metricsReporter) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index e33afb7927..e96a18cdd6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
@@ -35,6 +36,9 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -787,6 +791,78 @@ class ClusteringTableTest {
                         GenericRow.of(4, 44));
     }
 
+    // ==================== Stream Write with Frequent Compact Tests 
====================
+
+    /**
+     * Test that frequent compact triggers during streaming writes don't 
corrupt data. This
+     * exercises the {@code if (taskFuture != null) return} guard in {@code
+     * ClusteringCompactManager.triggerCompaction}.
+     */
+    @Test
+    public void testStreamWriteFrequentCompact() throws Exception {
+        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+        try (StreamTableWrite write =
+                        (StreamTableWrite) 
writeBuilder.newWrite().withIOManager(ioManager);
+                StreamTableCommit commit = writeBuilder.newCommit()) {
+            long commitId = 0;
+            for (int round = 0; round < 20; round++) {
+                write.write(GenericRow.of(round, round * 10));
+                // Trigger compact on every round for both buckets
+                write.compact(BinaryRow.EMPTY_ROW, 0, false);
+                write.compact(BinaryRow.EMPTY_ROW, 1, false);
+                commit.commit(commitId, write.prepareCommit(false, commitId));
+                commitId++;
+            }
+
+            // Final commit with waitCompaction to ensure all compactions 
finish
+            commit.commit(commitId, write.prepareCommit(true, commitId));
+
+            // All 20 distinct keys should be present
+            List<GenericRow> expected = new ArrayList<>();
+            for (int i = 0; i < 20; i++) {
+                expected.add(GenericRow.of(i, i * 10));
+            }
+            
assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected);
+        }
+    }
+
+    /**
+     * Test streaming writes with overlapping keys and frequent compaction. 
The {@code taskFuture !=
+     * null} guard should safely skip redundant compaction triggers while 
still producing correct
+     * deduplicated results.
+     */
+    @Test
+    public void testStreamWriteFrequentCompactWithOverlappingKeys() throws 
Exception {
+        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+        try (StreamTableWrite write =
+                        (StreamTableWrite) 
writeBuilder.newWrite().withIOManager(ioManager);
+                StreamTableCommit commit = writeBuilder.newCommit()) {
+            long commitId = 0;
+            for (int round = 0; round < 20; round++) {
+                // Write overlapping keys: keys 0-4 are updated every round
+                for (int i = 0; i < 5; i++) {
+                    write.write(GenericRow.of(i, round * 100 + i));
+                }
+                write.compact(BinaryRow.EMPTY_ROW, 0, false);
+                write.compact(BinaryRow.EMPTY_ROW, 1, false);
+                commit.commit(commitId, write.prepareCommit(false, commitId));
+                commitId++;
+            }
+
+            // Final wait for compaction
+            commit.commit(commitId, write.prepareCommit(true, commitId));
+
+            // Should see only the latest values from round 19
+            assertThat(readRows())
+                    .containsExactlyInAnyOrder(
+                            GenericRow.of(0, 1900),
+                            GenericRow.of(1, 1901),
+                            GenericRow.of(2, 1902),
+                            GenericRow.of(3, 1903),
+                            GenericRow.of(4, 1904));
+        }
+    }
+
     private Table createFirstRowTableWithLowSpillThreshold() throws Exception {
         Identifier identifier = Identifier.create("default", 
"first_row_spill_table");
         Schema schema =

Reply via email to