This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 235e7e316e8d3010d8595c1ab70db224d63cb73e Author: Jingsong Lee <[email protected]> AuthorDate: Thu Mar 26 22:17:27 2026 +0800 [core] Fix frequent compact triggers during streaming writes for pk clustering override (#7540) --- .../clustering/ClusteringCompactManager.java | 3 + .../paimon/separated/ClusteringTableTest.java | 76 ++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java index 3e288b22ca..1e9b02b32b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java @@ -180,6 +180,9 @@ public class ClusteringCompactManager extends CompactFutureManager { @Override public void triggerCompaction(boolean fullCompaction) { + if (taskFuture != null) { + return; + } taskFuture = executor.submit( new CompactTask(metricsReporter) { diff --git a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java index e33afb7927..e96a18cdd6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; @@ -35,6 +36,9 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ReadBuilder; @@ -787,6 +791,78 @@ class ClusteringTableTest { GenericRow.of(4, 44)); } + // ==================== Stream Write with Frequent Compact Tests ==================== + + /** + * Test that frequent compact triggers during streaming writes don't corrupt data. This + * exercises the {@code if (taskFuture != null) return} guard in {@code + * ClusteringCompactManager.triggerCompaction}. + */ + @Test + public void testStreamWriteFrequentCompact() throws Exception { + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); + try (StreamTableWrite write = + (StreamTableWrite) writeBuilder.newWrite().withIOManager(ioManager); + StreamTableCommit commit = writeBuilder.newCommit()) { + long commitId = 0; + for (int round = 0; round < 20; round++) { + write.write(GenericRow.of(round, round * 10)); + // Trigger compact on every round for both buckets + write.compact(BinaryRow.EMPTY_ROW, 0, false); + write.compact(BinaryRow.EMPTY_ROW, 1, false); + commit.commit(commitId, write.prepareCommit(false, commitId)); + commitId++; + } + + // Final commit with waitCompaction to ensure all compactions finish + commit.commit(commitId, write.prepareCommit(true, commitId)); + + // All 20 distinct keys should be present + List<GenericRow> expected = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + expected.add(GenericRow.of(i, i * 10)); + } + assertThat(readRows()).containsExactlyInAnyOrderElementsOf(expected); + } + } + + /** + * Test streaming writes with overlapping keys and frequent compaction. The {@code taskFuture != + * null} guard should safely skip redundant compaction triggers while still producing correct + * deduplicated results. + */ + @Test + public void testStreamWriteFrequentCompactWithOverlappingKeys() throws Exception { + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); + try (StreamTableWrite write = + (StreamTableWrite) writeBuilder.newWrite().withIOManager(ioManager); + StreamTableCommit commit = writeBuilder.newCommit()) { + long commitId = 0; + for (int round = 0; round < 20; round++) { + // Write overlapping keys: keys 0-4 are updated every round + for (int i = 0; i < 5; i++) { + write.write(GenericRow.of(i, round * 100 + i)); + } + write.compact(BinaryRow.EMPTY_ROW, 0, false); + write.compact(BinaryRow.EMPTY_ROW, 1, false); + commit.commit(commitId, write.prepareCommit(false, commitId)); + commitId++; + } + + // Final wait for compaction + commit.commit(commitId, write.prepareCommit(true, commitId)); + + // Should see only the latest values from round 19 + assertThat(readRows()) + .containsExactlyInAnyOrder( + GenericRow.of(0, 1900), + GenericRow.of(1, 1901), + GenericRow.of(2, 1902), + GenericRow.of(3, 1903), + GenericRow.of(4, 1904)); + } + } + private Table createFirstRowTableWithLowSpillThreshold() throws Exception { Identifier identifier = Identifier.create("default", "first_row_spill_table"); Schema schema =
