This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c93bd7206 [core] Avoid key bytes OOM in 
ClusteringFileRewriter.sortAndRewriteFile (#7642)
7c93bd7206 is described below

commit 7c93bd7206fa01c2896cad1a2118c8039018d40d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 14 17:48:36 2026 +0800

    [core] Avoid key bytes OOM in ClusteringFileRewriter.sortAndRewriteFile 
(#7642)
    
    Avoid key bytes OOM in ClusteringFileRewriter.sortAndRewriteFile.
    Removing the in-memory List<byte[]> collectedKeys and the batchPutIndex
    method eliminates the unbounded memory accumulation.
---
 .../clustering/ClusteringCompactManager.java       | 14 ++---
 .../compact/clustering/ClusteringFileRewriter.java | 60 +++++++---------------
 .../compact/clustering/ClusteringKeyIndex.java     | 28 +++-------
 3 files changed, 29 insertions(+), 73 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 9aac4ba08e..61813513d1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -43,10 +43,8 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.IntStream;
@@ -71,7 +69,7 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
     private final RowType keyType;
     private final RowType valueType;
     private final ExecutorService executor;
-    private final BucketedDvMaintainer dvMaintainer;
+    @Nullable private final BucketedDvMaintainer dvMaintainer;
     private final boolean lazyGenDeletionFile;
     @Nullable private final CompactionMetrics.Reporter metricsReporter;
 
@@ -89,7 +87,7 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
             KeyValueFileReaderFactory valueReaderFactory,
             KeyValueFileWriterFactory writerFactory,
             ExecutorService executor,
-            BucketedDvMaintainer dvMaintainer,
+            @Nullable BucketedDvMaintainer dvMaintainer,
             boolean lazyGenDeletionFile,
             List<DataFileMeta> restoreFiles,
             long targetFileSize,
@@ -206,14 +204,8 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
         // Snapshot sorted files before Phase 1 to avoid including newly 
created files in Phase 2
         List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
         for (DataFileMeta file : unsortedFiles) {
-            Set<String> originalFileNames = 
Collections.singleton(file.fileName());
             List<DataFileMeta> sortedFiles =
-                    fileRewriter.sortAndRewriteFiles(
-                            singletonList(file),
-                            kvSerializer,
-                            kvSchemaType,
-                            keyIndex,
-                            originalFileNames);
+                    fileRewriter.sortAndRewriteFile(file, kvSerializer, 
kvSchemaType, keyIndex);
             result.before().add(file);
             result.after().addAll(sortedFiles);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index 3689e70483..7788d4ced9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -48,8 +48,6 @@ import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
 import org.apache.paimon.utils.MutableObjectIterator;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,7 +55,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
-import java.util.Set;
 
 /**
  * Handles file rewriting for clustering compaction, including sorting 
unsorted files (Phase 1) and
@@ -115,20 +112,16 @@ public class ClusteringFileRewriter {
     }
 
     /**
-     * Sort and rewrite unsorted files by clustering columns. Reads all 
KeyValue records, sorts them
+     * Sort and rewrite unsorted file by clustering columns. Reads all 
KeyValue records, sorts them
      * using an external sort buffer, and writes to new level-1 files. Checks 
the key index inline
      * during writing to handle deduplication (FIRST_ROW skips duplicates, 
DEDUPLICATE marks old
-     * positions in DV) and updates the index without re-reading the output 
files.
-     *
-     * @param keyIndex the key index for inline checking and batch update, or 
null to skip
-     * @param originalFileNames file names of the original files being 
replaced (for index check)
+     * positions in DV) and updates the index.
      */
-    public List<DataFileMeta> sortAndRewriteFiles(
-            List<DataFileMeta> inputFiles,
+    public List<DataFileMeta> sortAndRewriteFile(
+            DataFileMeta inputFile,
             KeyValueSerializer kvSerializer,
             RowType kvSchemaType,
-            @Nullable ClusteringKeyIndex keyIndex,
-            Set<String> originalFileNames)
+            ClusteringKeyIndex keyIndex)
             throws Exception {
         int[] sortFieldsInKeyValue =
                 Arrays.stream(clusteringColumns)
@@ -146,21 +139,17 @@ public class ClusteringFileRewriter {
                         MemorySize.MAX_VALUE,
                         false);
 
-        for (DataFileMeta file : inputFiles) {
-            try (RecordReader<KeyValue> reader = 
valueReaderFactory.createRecordReader(file)) {
-                try (CloseableIterator<KeyValue> iterator = 
reader.toCloseableIterator()) {
-                    while (iterator.hasNext()) {
-                        KeyValue kv = iterator.next();
-                        InternalRow serializedRow = kvSerializer.toRow(kv);
-                        sortBuffer.write(serializedRow);
-                    }
+        try (RecordReader<KeyValue> reader = 
valueReaderFactory.createRecordReader(inputFile)) {
+            try (CloseableIterator<KeyValue> iterator = 
reader.toCloseableIterator()) {
+                while (iterator.hasNext()) {
+                    KeyValue kv = iterator.next();
+                    InternalRow serializedRow = kvSerializer.toRow(kv);
+                    sortBuffer.write(serializedRow);
                 }
             }
         }
 
-        RowCompactedSerializer keySerializer =
-                keyIndex != null ? new RowCompactedSerializer(keyType) : null;
-        List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() : 
null;
+        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
 
         RollingFileWriter<KeyValue, DataFileMeta> writer =
                 writerFactory.createRollingClusteringFileWriter();
@@ -173,12 +162,9 @@ public class ClusteringFileRewriter {
                         kv.copy(
                                 new InternalRowSerializer(keyType),
                                 new InternalRowSerializer(valueType));
-                if (keyIndex != null) {
-                    byte[] keyBytes = 
keySerializer.serializeToBytes(copied.key());
-                    if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
-                        continue;
-                    }
-                    collectedKeys.add(keyBytes);
+                byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
+                if (!keyIndex.checkKey(keyBytes)) {
+                    continue;
                 }
                 writer.write(copied);
             }
@@ -188,23 +174,13 @@ public class ClusteringFileRewriter {
         }
 
         List<DataFileMeta> newFiles = writer.result();
-        for (DataFileMeta file : inputFiles) {
-            fileLevels.removeFile(file);
-        }
+        fileLevels.removeFile(inputFile);
         for (DataFileMeta newFile : newFiles) {
             fileLevels.addNewFile(newFile);
         }
-
-        // Batch update index using collected keys, split by file rowCount
-        if (keyIndex != null) {
-            int offset = 0;
-            for (DataFileMeta newFile : newFiles) {
-                int count = (int) newFile.rowCount();
-                keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, 
offset + count));
-                offset += count;
-            }
+        for (DataFileMeta sortedFile : newFiles) {
+            keyIndex.rebuildIndex(sortedFile);
         }
-
         return newFiles;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
index 2b4e412727..fee1abae2b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -39,6 +39,8 @@ import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.MutableObjectIterator;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
@@ -48,9 +50,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
 import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
 
@@ -63,7 +65,7 @@ public class ClusteringKeyIndex implements Closeable {
     private final RowType keyType;
     private final IOManager ioManager;
     private final KeyValueFileReaderFactory keyReaderFactory;
-    private final BucketedDvMaintainer dvMaintainer;
+    private final @Nullable BucketedDvMaintainer dvMaintainer;
     private final SimpleLsmKvDb kvDb;
     private final ClusteringFiles fileLevels;
     private final boolean firstRow;
@@ -76,7 +78,7 @@ public class ClusteringKeyIndex implements Closeable {
             RowType keyType,
             IOManager ioManager,
             KeyValueFileReaderFactory keyReaderFactory,
-            BucketedDvMaintainer dvMaintainer,
+            @Nullable BucketedDvMaintainer dvMaintainer,
             SimpleLsmKvDb kvDb,
             ClusteringFiles fileLevels,
             boolean firstRow,
@@ -216,20 +218,20 @@ public class ClusteringKeyIndex implements Closeable {
      * in deletion vectors, return true (write the new record).
      *
      * @param keyBytes serialized key bytes
-     * @param originalFileNames file names of the original unsorted files 
being replaced
      * @return true if the record should be written, false to skip (FIRST_ROW 
dedup)
      */
-    public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames) 
throws Exception {
+    public boolean checkKey(byte[] keyBytes) throws Exception {
         byte[] oldValue = kvDb.get(keyBytes);
         if (oldValue != null) {
             ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
             int oldFileId = decodeInt(valueIn);
             int oldPosition = decodeInt(valueIn);
             DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
-            if (oldFile != null && 
!originalFileNames.contains(oldFile.fileName())) {
+            if (oldFile != null) {
                 if (firstRow) {
                     return false;
                 } else {
+                    checkNotNull(dvMaintainer, "DvMaintainer cannot be null 
for DEDUPLICATE mode.");
                     dvMaintainer.notifyNewDeletion(oldFile.fileName(), 
oldPosition);
                 }
             }
@@ -237,20 +239,6 @@ public class ClusteringKeyIndex implements Closeable {
         return true;
     }
 
-    /**
-     * Batch update the key index for a new sorted file using pre-collected 
key bytes. Avoids
-     * re-reading the file.
-     */
-    public void batchPutIndex(DataFileMeta sortedFile, List<byte[]> 
keyBytesList) throws Exception {
-        int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
-        for (int position = 0; position < keyBytesList.size(); position++) {
-            ByteArrayOutputStream value = new ByteArrayOutputStream(8);
-            encodeInt(value, fileId);
-            encodeInt(value, position);
-            kvDb.put(keyBytesList.get(position), value.toByteArray());
-        }
-    }
-
     /** Delete key index entries for the given file (only if they still point 
to it). */
     public void deleteIndex(DataFileMeta file) throws Exception {
         RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);

Reply via email to