This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7c93bd7206 [core] Avoid key bytes OOM in
ClusteringFileRewriter.sortAndRewriteFile (#7642)
7c93bd7206 is described below
commit 7c93bd7206fa01c2896cad1a2118c8039018d40d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 14 17:48:36 2026 +0800
[core] Avoid key bytes OOM in ClusteringFileRewriter.sortAndRewriteFile
(#7642)
Avoid key bytes OOM in ClusteringFileRewriter.sortAndRewriteFile.
Removing the in-memory List<byte[]> collectedKeys and the batchPutIndex
method eliminates the unbounded memory accumulation.
---
.../clustering/ClusteringCompactManager.java | 14 ++---
.../compact/clustering/ClusteringFileRewriter.java | 60 +++++++---------------
.../compact/clustering/ClusteringKeyIndex.java | 28 +++-------
3 files changed, 29 insertions(+), 73 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 9aac4ba08e..61813513d1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -43,10 +43,8 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
@@ -71,7 +69,7 @@ public class ClusteringCompactManager extends
CompactFutureManager {
private final RowType keyType;
private final RowType valueType;
private final ExecutorService executor;
- private final BucketedDvMaintainer dvMaintainer;
+ @Nullable private final BucketedDvMaintainer dvMaintainer;
private final boolean lazyGenDeletionFile;
@Nullable private final CompactionMetrics.Reporter metricsReporter;
@@ -89,7 +87,7 @@ public class ClusteringCompactManager extends
CompactFutureManager {
KeyValueFileReaderFactory valueReaderFactory,
KeyValueFileWriterFactory writerFactory,
ExecutorService executor,
- BucketedDvMaintainer dvMaintainer,
+ @Nullable BucketedDvMaintainer dvMaintainer,
boolean lazyGenDeletionFile,
List<DataFileMeta> restoreFiles,
long targetFileSize,
@@ -206,14 +204,8 @@ public class ClusteringCompactManager extends
CompactFutureManager {
// Snapshot sorted files before Phase 1 to avoid including newly
created files in Phase 2
List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
for (DataFileMeta file : unsortedFiles) {
- Set<String> originalFileNames =
Collections.singleton(file.fileName());
List<DataFileMeta> sortedFiles =
- fileRewriter.sortAndRewriteFiles(
- singletonList(file),
- kvSerializer,
- kvSchemaType,
- keyIndex,
- originalFileNames);
+ fileRewriter.sortAndRewriteFile(file, kvSerializer,
kvSchemaType, keyIndex);
result.before().add(file);
result.after().addAll(sortedFiles);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index 3689e70483..7788d4ced9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -48,8 +48,6 @@ import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
import org.apache.paimon.utils.MutableObjectIterator;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -57,7 +55,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
-import java.util.Set;
/**
* Handles file rewriting for clustering compaction, including sorting
unsorted files (Phase 1) and
@@ -115,20 +112,16 @@ public class ClusteringFileRewriter {
}
/**
- * Sort and rewrite unsorted files by clustering columns. Reads all
KeyValue records, sorts them
+ * Sort and rewrite unsorted file by clustering columns. Reads all
KeyValue records, sorts them
* using an external sort buffer, and writes to new level-1 files. Checks
the key index inline
* during writing to handle deduplication (FIRST_ROW skips duplicates,
DEDUPLICATE marks old
- * positions in DV) and updates the index without re-reading the output
files.
- *
- * @param keyIndex the key index for inline checking and batch update, or
null to skip
- * @param originalFileNames file names of the original files being
replaced (for index check)
+ * positions in DV) and updates the index.
*/
- public List<DataFileMeta> sortAndRewriteFiles(
- List<DataFileMeta> inputFiles,
+ public List<DataFileMeta> sortAndRewriteFile(
+ DataFileMeta inputFile,
KeyValueSerializer kvSerializer,
RowType kvSchemaType,
- @Nullable ClusteringKeyIndex keyIndex,
- Set<String> originalFileNames)
+ ClusteringKeyIndex keyIndex)
throws Exception {
int[] sortFieldsInKeyValue =
Arrays.stream(clusteringColumns)
@@ -146,21 +139,17 @@ public class ClusteringFileRewriter {
MemorySize.MAX_VALUE,
false);
- for (DataFileMeta file : inputFiles) {
- try (RecordReader<KeyValue> reader =
valueReaderFactory.createRecordReader(file)) {
- try (CloseableIterator<KeyValue> iterator =
reader.toCloseableIterator()) {
- while (iterator.hasNext()) {
- KeyValue kv = iterator.next();
- InternalRow serializedRow = kvSerializer.toRow(kv);
- sortBuffer.write(serializedRow);
- }
+ try (RecordReader<KeyValue> reader =
valueReaderFactory.createRecordReader(inputFile)) {
+ try (CloseableIterator<KeyValue> iterator =
reader.toCloseableIterator()) {
+ while (iterator.hasNext()) {
+ KeyValue kv = iterator.next();
+ InternalRow serializedRow = kvSerializer.toRow(kv);
+ sortBuffer.write(serializedRow);
}
}
}
- RowCompactedSerializer keySerializer =
- keyIndex != null ? new RowCompactedSerializer(keyType) : null;
- List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() :
null;
+ RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingClusteringFileWriter();
@@ -173,12 +162,9 @@ public class ClusteringFileRewriter {
kv.copy(
new InternalRowSerializer(keyType),
new InternalRowSerializer(valueType));
- if (keyIndex != null) {
- byte[] keyBytes =
keySerializer.serializeToBytes(copied.key());
- if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
- continue;
- }
- collectedKeys.add(keyBytes);
+ byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
+ if (!keyIndex.checkKey(keyBytes)) {
+ continue;
}
writer.write(copied);
}
@@ -188,23 +174,13 @@ public class ClusteringFileRewriter {
}
List<DataFileMeta> newFiles = writer.result();
- for (DataFileMeta file : inputFiles) {
- fileLevels.removeFile(file);
- }
+ fileLevels.removeFile(inputFile);
for (DataFileMeta newFile : newFiles) {
fileLevels.addNewFile(newFile);
}
-
- // Batch update index using collected keys, split by file rowCount
- if (keyIndex != null) {
- int offset = 0;
- for (DataFileMeta newFile : newFiles) {
- int count = (int) newFile.rowCount();
- keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset,
offset + count));
- offset += count;
- }
+ for (DataFileMeta sortedFile : newFiles) {
+ keyIndex.rebuildIndex(sortedFile);
}
-
return newFiles;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
index 2b4e412727..fee1abae2b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -39,6 +39,8 @@ import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.MutableObjectIterator;
+import javax.annotation.Nullable;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
@@ -48,9 +50,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.IntStream;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
@@ -63,7 +65,7 @@ public class ClusteringKeyIndex implements Closeable {
private final RowType keyType;
private final IOManager ioManager;
private final KeyValueFileReaderFactory keyReaderFactory;
- private final BucketedDvMaintainer dvMaintainer;
+ private final @Nullable BucketedDvMaintainer dvMaintainer;
private final SimpleLsmKvDb kvDb;
private final ClusteringFiles fileLevels;
private final boolean firstRow;
@@ -76,7 +78,7 @@ public class ClusteringKeyIndex implements Closeable {
RowType keyType,
IOManager ioManager,
KeyValueFileReaderFactory keyReaderFactory,
- BucketedDvMaintainer dvMaintainer,
+ @Nullable BucketedDvMaintainer dvMaintainer,
SimpleLsmKvDb kvDb,
ClusteringFiles fileLevels,
boolean firstRow,
@@ -216,20 +218,20 @@ public class ClusteringKeyIndex implements Closeable {
* in deletion vectors, return true (write the new record).
*
* @param keyBytes serialized key bytes
- * @param originalFileNames file names of the original unsorted files
being replaced
* @return true if the record should be written, false to skip (FIRST_ROW
dedup)
*/
- public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames)
throws Exception {
+ public boolean checkKey(byte[] keyBytes) throws Exception {
byte[] oldValue = kvDb.get(keyBytes);
if (oldValue != null) {
ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
int oldFileId = decodeInt(valueIn);
int oldPosition = decodeInt(valueIn);
DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
- if (oldFile != null &&
!originalFileNames.contains(oldFile.fileName())) {
+ if (oldFile != null) {
if (firstRow) {
return false;
} else {
+ checkNotNull(dvMaintainer, "DvMaintainer cannot be null
for DEDUPLICATE mode.");
dvMaintainer.notifyNewDeletion(oldFile.fileName(),
oldPosition);
}
}
@@ -237,20 +239,6 @@ public class ClusteringKeyIndex implements Closeable {
return true;
}
- /**
- * Batch update the key index for a new sorted file using pre-collected
key bytes. Avoids
- * re-reading the file.
- */
- public void batchPutIndex(DataFileMeta sortedFile, List<byte[]>
keyBytesList) throws Exception {
- int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
- for (int position = 0; position < keyBytesList.size(); position++) {
- ByteArrayOutputStream value = new ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(keyBytesList.get(position), value.toByteArray());
- }
- }
-
/** Delete key index entries for the given file (only if they still point
to it). */
public void deleteIndex(DataFileMeta file) throws Exception {
RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);