This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 868b0800607b7edea77c5e96044d04b9e1bd039a Author: yuzelin <[email protected]> AuthorDate: Wed Mar 25 16:25:47 2026 +0800 [core] Support config compaction file num limit and report if reach the limit in batch job (#7520) 1. In batch job, we should notice the outside system that whether current compaction has handled all files, so they can determine if they need to compact again. 2. Make the limit configurable. --- .../shortcodes/generated/core_configuration.html | 6 ++ .../main/java/org/apache/paimon/CoreOptions.java | 12 ++++ .../paimon/append/AppendCompactCoordinator.java | 25 ++++++- .../append/AppendOnlyTableCompactionTest.java | 78 ++++++++++++++++++++++ 4 files changed, 118 insertions(+), 3 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 44aeee4980..67618e4e83 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -308,6 +308,12 @@ under the License. <td>Double</td> <td>Ratio of the deleted rows in a data file to be forced compacted for append-only table.</td> </tr> + <tr> + <td><h5>compaction.file-num-limit</h5></td> + <td style="word-wrap: break-word;">100000</td> + <td>Integer</td> + <td>To avoid OOM caused by scanning compaction files, you can use this option to limit the for unaware-bucket append table compaction.</td> + </tr> <tr> <td><h5>compaction.force-rewrite-all-files</h5></td> <td style="word-wrap: break-word;">false</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 569615f8b0..e8ae69e9be 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -874,6 +874,14 @@ public class CoreOptions implements Serializable { "For file set [f_0,...,f_N], the minimum file number to trigger a compaction for " + "append-only table."); + public static final ConfigOption<Integer> COMPACTION_FILE_NUM_LIMIT = + key("compaction.file-num-limit") + .intType() + .defaultValue(100_000) + .withDescription( + "To avoid OOM caused by scanning compaction files, you can use this option to limit the " + + "for unaware-bucket append table compaction."); + public static final ConfigOption<Double> COMPACTION_DELETE_RATIO_THRESHOLD = key("compaction.delete-ratio-threshold") .doubleType() @@ -3026,6 +3034,10 @@ public class CoreOptions implements Serializable { return options.get(COMPACTION_MIN_FILE_NUM); } + public int compactionFileNumLimit() { + return options.get(COMPACTION_FILE_NUM_LIMIT); + } + public double compactionDeleteRatioThreshold() { return options.get(COMPACTION_DELETE_RATIO_THRESHOLD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java index 07374831e7..70e5040e3b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java @@ -69,8 +69,6 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; */ public class AppendCompactCoordinator { - private static final int FILES_BATCH = 100_000; - protected static final int REMOVE_AGE = 10; protected static final int COMPACT_AGE = 5; @@ -80,11 +78,15 @@ public class AppendCompactCoordinator { private final double deleteThreshold; private final long openFileCost; private final int minFileNum; + private final int fileNumLimit; private final DvMaintainerCache dvMaintainerCache; private final FilesIterator filesIterator; final Map<BinaryRow, SubCoordinator> subCoordinators = new HashMap<>(); + private final boolean isStreaming; + private boolean batchRemainFiles; + public AppendCompactCoordinator(FileStoreTable table, boolean isStreaming) { this(table, isStreaming, null); } @@ -101,11 +103,13 @@ public class AppendCompactCoordinator { this.deleteThreshold = options.compactionDeleteRatioThreshold(); this.openFileCost = options.splitOpenFileCost(); this.minFileNum = options.compactionMinFileNum(); + this.fileNumLimit = options.compactionFileNumLimit(); this.dvMaintainerCache = options.deletionVectorsEnabled() ? new DvMaintainerCache(table.store().newIndexFileHandler()) : null; this.filesIterator = new FilesIterator(table, isStreaming, partitionPredicate); + this.isStreaming = isStreaming; } public List<AppendCompactTask> run() { @@ -121,11 +125,12 @@ public class AppendCompactCoordinator { @VisibleForTesting boolean scan() { Map<BinaryRow, List<DataFileMeta>> files = new HashMap<>(); - for (int i = 0; i < FILES_BATCH; i++) { + for (int i = 0; i < fileNumLimit; i++) { ManifestEntry entry; try { entry = filesIterator.next(); } catch (EndOfScanException e) { + batchRemainFiles = false; if (!files.isEmpty()) { files.forEach(this::notifyNewFiles); return true; @@ -140,13 +145,27 @@ public class AppendCompactCoordinator { } if (files.isEmpty()) { + batchRemainFiles = false; return false; } + if (!isStreaming) { + try { + ManifestEntry entry = filesIterator.next(); + batchRemainFiles = entry != null; + } catch (EndOfScanException e) { + batchRemainFiles = false; + } + } + files.forEach(this::notifyNewFiles); return true; } + public boolean batchRemainFiles() { + return batchRemainFiles; + } + @VisibleForTesting FilesIterator filesIterator() { return filesIterator; diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java index 5af16aa316..e4d1c43ed2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.SnapshotManager; @@ -43,13 +44,18 @@ import org.junit.jupiter.api.io.TempDir; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import static java.util.Collections.singletonMap; import static org.apache.paimon.SnapshotTest.newSnapshotManager; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for append table compaction. */ public class AppendOnlyTableCompactionTest { @@ -208,6 +214,78 @@ public class AppendOnlyTableCompactionTest { return result; } + @Test + public void testBatchScanFileNumLimitWithRemainingFiles() throws Exception { + commit(writeCommit(10)); + + Map<String, String> options = new HashMap<>(); + options.put("compaction.file-num-limit", "5"); + AppendCompactCoordinator batchCoordinator = createBatchCoordinator(options); + + assertThat(batchCoordinator.scan()).isTrue(); + assertThat(batchCoordinator.batchRemainFiles()).isTrue(); + assertThat(batchCoordinator.listRestoredFiles().size()).isEqualTo(5); + } + + @Test + public void testBatchScanFileNumLimitNoRemainingFiles() throws Exception { + commit(writeCommit(10)); + + // default 100_000 + AppendCompactCoordinator batchCoordinator = createBatchCoordinator(Collections.emptyMap()); + + assertThat(batchCoordinator.scan()).isTrue(); + assertThat(batchCoordinator.batchRemainFiles()).isFalse(); + assertThat(batchCoordinator.listRestoredFiles().size()).isEqualTo(10); + } + + @Test + public void testBatchScanFileNumLimitExactMatch() throws Exception { + commit(writeCommit(5)); + + Map<String, String> options = new HashMap<>(); + options.put("compaction.file-num-limit", "5"); + AppendCompactCoordinator batchCoordinator = createBatchCoordinator(options); + + assertThat(batchCoordinator.scan()).isTrue(); + assertThat(batchCoordinator.batchRemainFiles()).isFalse(); + assertThat(batchCoordinator.listRestoredFiles().size()).isEqualTo(5); + } + + @Test + public void testBatchScanEmptyTableBatchRemainFiles() { + AppendCompactCoordinator batchCoordinator = createBatchCoordinator(Collections.emptyMap()); + + assertThatThrownBy(batchCoordinator::scan) + .satisfies(anyCauseMatches(EndOfScanException.class)); + assertThat(batchCoordinator.batchRemainFiles()).isFalse(); + } + + @Test + public void testBatchScanAllFilesFilteredOut() throws Exception { + commit(writeCommit(10)); + + // Set target-file-size to 1 byte, making compactionFileSize = 0, + // so shouldCompact returns false for all files + Map<String, String> options = new HashMap<>(); + options.put("target-file-size", "1 b"); + AppendCompactCoordinator batchCoordinator = createBatchCoordinator(options); + + assertThatThrownBy(batchCoordinator::scan) + .satisfies(anyCauseMatches(EndOfScanException.class)); + assertThat(batchCoordinator.batchRemainFiles()).isFalse(); + } + + private AppendCompactCoordinator createBatchCoordinator(Map<String, String> extraOptions) { + TableSchema schema = tableSchema.copy(extraOptions); + FileStoreTable table = + FileStoreTableFactory.create( + LocalFileIO.create(), + new org.apache.paimon.fs.Path(tempDir.toString()), + schema); + return new AppendCompactCoordinator(table, false); + } + private InternalRow randomRow() { return GenericRow.of( random.nextInt(100),
