This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2f647dd29e [core] Support config compaction file num limit and report
if reach the limit in batch job (#7520)
2f647dd29e is described below
commit 2f647dd29e19acba6b56675b4cc8cdeaf4a685b8
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 25 16:25:47 2026 +0800
[core] Support config compaction file num limit and report if reach the
limit in batch job (#7520)
1. In batch job, we should notice the outside system that whether
current compaction has handled all files, so they can determine if they
need to compact again.
2. Make the limit configurable.
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 12 ++++
.../paimon/append/AppendCompactCoordinator.java | 25 ++++++-
.../append/AppendOnlyTableCompactionTest.java | 78 ++++++++++++++++++++++
4 files changed, 118 insertions(+), 3 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 44aeee4980..67618e4e83 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -308,6 +308,12 @@ under the License.
<td>Double</td>
<td>Ratio of the deleted rows in a data file to be forced
compacted for append-only table.</td>
</tr>
+ <tr>
+ <td><h5>compaction.file-num-limit</h5></td>
+ <td style="word-wrap: break-word;">100000</td>
+ <td>Integer</td>
+ <td>To avoid OOM caused by scanning compaction files, you can use
this option to limit the for unaware-bucket append table compaction.</td>
+ </tr>
<tr>
<td><h5>compaction.force-rewrite-all-files</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 569615f8b0..e8ae69e9be 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -874,6 +874,14 @@ public class CoreOptions implements Serializable {
"For file set [f_0,...,f_N], the minimum file
number to trigger a compaction for "
+ "append-only table.");
+ public static final ConfigOption<Integer> COMPACTION_FILE_NUM_LIMIT =
+ key("compaction.file-num-limit")
+ .intType()
+ .defaultValue(100_000)
+ .withDescription(
+ "To avoid OOM caused by scanning compaction files,
you can use this option to limit the "
+ + "for unaware-bucket append table
compaction.");
+
public static final ConfigOption<Double> COMPACTION_DELETE_RATIO_THRESHOLD
=
key("compaction.delete-ratio-threshold")
.doubleType()
@@ -3026,6 +3034,10 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_MIN_FILE_NUM);
}
+ public int compactionFileNumLimit() {
+ return options.get(COMPACTION_FILE_NUM_LIMIT);
+ }
+
public double compactionDeleteRatioThreshold() {
return options.get(COMPACTION_DELETE_RATIO_THRESHOLD);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
index 07374831e7..70e5040e3b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
@@ -69,8 +69,6 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
public class AppendCompactCoordinator {
- private static final int FILES_BATCH = 100_000;
-
protected static final int REMOVE_AGE = 10;
protected static final int COMPACT_AGE = 5;
@@ -80,11 +78,15 @@ public class AppendCompactCoordinator {
private final double deleteThreshold;
private final long openFileCost;
private final int minFileNum;
+ private final int fileNumLimit;
private final DvMaintainerCache dvMaintainerCache;
private final FilesIterator filesIterator;
final Map<BinaryRow, SubCoordinator> subCoordinators = new HashMap<>();
+ private final boolean isStreaming;
+ private boolean batchRemainFiles;
+
public AppendCompactCoordinator(FileStoreTable table, boolean isStreaming)
{
this(table, isStreaming, null);
}
@@ -101,11 +103,13 @@ public class AppendCompactCoordinator {
this.deleteThreshold = options.compactionDeleteRatioThreshold();
this.openFileCost = options.splitOpenFileCost();
this.minFileNum = options.compactionMinFileNum();
+ this.fileNumLimit = options.compactionFileNumLimit();
this.dvMaintainerCache =
options.deletionVectorsEnabled()
? new
DvMaintainerCache(table.store().newIndexFileHandler())
: null;
this.filesIterator = new FilesIterator(table, isStreaming,
partitionPredicate);
+ this.isStreaming = isStreaming;
}
public List<AppendCompactTask> run() {
@@ -121,11 +125,12 @@ public class AppendCompactCoordinator {
@VisibleForTesting
boolean scan() {
Map<BinaryRow, List<DataFileMeta>> files = new HashMap<>();
- for (int i = 0; i < FILES_BATCH; i++) {
+ for (int i = 0; i < fileNumLimit; i++) {
ManifestEntry entry;
try {
entry = filesIterator.next();
} catch (EndOfScanException e) {
+ batchRemainFiles = false;
if (!files.isEmpty()) {
files.forEach(this::notifyNewFiles);
return true;
@@ -140,13 +145,27 @@ public class AppendCompactCoordinator {
}
if (files.isEmpty()) {
+ batchRemainFiles = false;
return false;
}
+ if (!isStreaming) {
+ try {
+ ManifestEntry entry = filesIterator.next();
+ batchRemainFiles = entry != null;
+ } catch (EndOfScanException e) {
+ batchRemainFiles = false;
+ }
+ }
+
files.forEach(this::notifyNewFiles);
return true;
}
+ public boolean batchRemainFiles() {
+ return batchRemainFiles;
+ }
+
@VisibleForTesting
FilesIterator filesIterator() {
return filesIterator;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
index 5af16aa316..e4d1c43ed2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;
@@ -43,13 +44,18 @@ import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
import static java.util.Collections.singletonMap;
import static org.apache.paimon.SnapshotTest.newSnapshotManager;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for append table compaction. */
public class AppendOnlyTableCompactionTest {
@@ -208,6 +214,78 @@ public class AppendOnlyTableCompactionTest {
return result;
}
+ @Test
+ public void testBatchScanFileNumLimitWithRemainingFiles() throws Exception
{
+ commit(writeCommit(10));
+
+ Map<String, String> options = new HashMap<>();
+ options.put("compaction.file-num-limit", "5");
+ AppendCompactCoordinator batchCoordinator =
createBatchCoordinator(options);
+
+ assertThat(batchCoordinator.scan()).isTrue();
+ assertThat(batchCoordinator.batchRemainFiles()).isTrue();
+ assertThat(batchCoordinator.listRestoredFiles().size()).isEqualTo(5);
+ }
+
+ @Test
+ public void testBatchScanFileNumLimitNoRemainingFiles() throws Exception {
+ commit(writeCommit(10));
+
+ // default 100_000
+ AppendCompactCoordinator batchCoordinator =
createBatchCoordinator(Collections.emptyMap());
+
+ assertThat(batchCoordinator.scan()).isTrue();
+ assertThat(batchCoordinator.batchRemainFiles()).isFalse();
+ assertThat(batchCoordinator.listRestoredFiles().size()).isEqualTo(10);
+ }
+
+ @Test
+ public void testBatchScanFileNumLimitExactMatch() throws Exception {
+ commit(writeCommit(5));
+
+ Map<String, String> options = new HashMap<>();
+ options.put("compaction.file-num-limit", "5");
+ AppendCompactCoordinator batchCoordinator =
createBatchCoordinator(options);
+
+ assertThat(batchCoordinator.scan()).isTrue();
+ assertThat(batchCoordinator.batchRemainFiles()).isFalse();
+ assertThat(batchCoordinator.listRestoredFiles().size()).isEqualTo(5);
+ }
+
+ @Test
+ public void testBatchScanEmptyTableBatchRemainFiles() {
+ AppendCompactCoordinator batchCoordinator =
createBatchCoordinator(Collections.emptyMap());
+
+ assertThatThrownBy(batchCoordinator::scan)
+ .satisfies(anyCauseMatches(EndOfScanException.class));
+ assertThat(batchCoordinator.batchRemainFiles()).isFalse();
+ }
+
+ @Test
+ public void testBatchScanAllFilesFilteredOut() throws Exception {
+ commit(writeCommit(10));
+
+ // Set target-file-size to 1 byte, making compactionFileSize = 0,
+ // so shouldCompact returns false for all files
+ Map<String, String> options = new HashMap<>();
+ options.put("target-file-size", "1 b");
+ AppendCompactCoordinator batchCoordinator =
createBatchCoordinator(options);
+
+ assertThatThrownBy(batchCoordinator::scan)
+ .satisfies(anyCauseMatches(EndOfScanException.class));
+ assertThat(batchCoordinator.batchRemainFiles()).isFalse();
+ }
+
+ private AppendCompactCoordinator createBatchCoordinator(Map<String,
String> extraOptions) {
+ TableSchema schema = tableSchema.copy(extraOptions);
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new org.apache.paimon.fs.Path(tempDir.toString()),
+ schema);
+ return new AppendCompactCoordinator(table, false);
+ }
+
private InternalRow randomRow() {
return GenericRow.of(
random.nextInt(100),