This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 271ad81c84 [core] Add bulk load for bootstrapping in clustering 
compact (#7497)
271ad81c84 is described below

commit 271ad81c843bb262e02da9f3eb435b6f680e53cb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 23 11:48:56 2026 +0800

    [core] Add bulk load for bootstrapping in clustering compact (#7497)
    
    Adds a bulkLoad() method to `SimpleLsmKvDb` that writes globally sorted
    entries directly into SST files at the deepest level (L3), bypassing
    MemTable/flush/compaction. This is used by
    `ClusteringKeyIndex.bootstrap()` to efficiently populate the key index
    during restore.
---
 .../paimon/lookup/sort/db/SimpleLsmKvDb.java       |  86 +++
 .../paimon/lookup/sort/db/SimpleLsmKvDbTest.java   | 124 ++++
 .../clustering/ClusteringCompactManager.java       | 657 ++-------------------
 ...actManager.java => ClusteringFileRewriter.java} | 545 ++++-------------
 .../compact/clustering/ClusteringKeyIndex.java     | 317 ++++++++++
 .../paimon/separated/ClusteringTableTest.java      |  50 ++
 6 files changed, 740 insertions(+), 1039 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
index 7ee095da86..8871fcab48 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -94,6 +95,7 @@ public class SimpleLsmKvDb implements Closeable {
     private final SortLookupStoreFactory storeFactory;
     private final Comparator<MemorySlice> keyComparator;
     private final long memTableFlushThreshold;
+    private final long maxSstFileSize;
     private final LsmCompactor compactor;
 
     /** Active MemTable: key -> value bytes (empty byte[] = tombstone). */
@@ -127,6 +129,7 @@ public class SimpleLsmKvDb implements Closeable {
         this.storeFactory = storeFactory;
         this.keyComparator = keyComparator;
         this.memTableFlushThreshold = memTableFlushThreshold;
+        this.maxSstFileSize = maxSstFileSize;
         this.memTable = new TreeMap<>(keyComparator);
         this.memTableSize = 0;
         this.levels = new ArrayList<>();
@@ -225,6 +228,89 @@ public class SimpleLsmKvDb implements Closeable {
         maybeFlushMemTable();
     }
 
+    /**
+     * Bulk-load globally sorted entries directly into SST files at the 
deepest level, bypassing
+     * MemTable, flush, and compaction entirely. The database must be empty 
when this is called.
+     *
+     * @param sortedEntries an iterator of key-value pairs in sorted order (by 
the DB's key
+     *     comparator)
+     */
+    public void bulkLoad(Iterator<Map.Entry<byte[], byte[]>> sortedEntries) 
throws IOException {
+        ensureOpen();
+        if (!memTable.isEmpty() || getSstFileCount() > 0) {
+            throw new IllegalStateException(
+                    "bulkLoad requires an empty database (no memTable entries 
and no SST files)");
+        }
+
+        int targetLevel = MAX_LEVELS - 1;
+        List<SstFileMetadata> targetLevelFiles = levels.get(targetLevel);
+
+        SortLookupStoreWriter currentWriter = null;
+        File currentSstFile = null;
+        MemorySlice currentFileMinKey = null;
+        MemorySlice currentFileMaxKey = null;
+        long currentBatchSize = 0;
+
+        try {
+            while (sortedEntries.hasNext()) {
+                Map.Entry<byte[], byte[]> entry = sortedEntries.next();
+                byte[] key = entry.getKey();
+                byte[] value = entry.getValue();
+
+                if (currentWriter == null) {
+                    currentSstFile = newSstFile();
+                    currentWriter = storeFactory.createWriter(currentSstFile, 
null);
+                    currentFileMinKey = MemorySlice.wrap(key);
+                    currentBatchSize = 0;
+                }
+
+                currentWriter.put(key, value);
+                currentFileMaxKey = MemorySlice.wrap(key);
+                currentBatchSize += key.length + value.length;
+
+                if (currentBatchSize >= maxSstFileSize) {
+                    currentWriter.close();
+                    targetLevelFiles.add(
+                            new SstFileMetadata(
+                                    currentSstFile,
+                                    currentFileMinKey,
+                                    currentFileMaxKey,
+                                    0,
+                                    targetLevel));
+                    currentWriter = null;
+                    currentSstFile = null;
+                    currentFileMinKey = null;
+                    currentFileMaxKey = null;
+                }
+            }
+
+            if (currentWriter != null) {
+                currentWriter.close();
+                targetLevelFiles.add(
+                        new SstFileMetadata(
+                                currentSstFile,
+                                currentFileMinKey,
+                                currentFileMaxKey,
+                                0,
+                                targetLevel));
+            }
+        } catch (IOException | RuntimeException e) {
+            if (currentWriter != null) {
+                try {
+                    currentWriter.close();
+                } catch (IOException suppressed) {
+                    e.addSuppressed(suppressed);
+                }
+            }
+            throw e;
+        }
+
+        LOG.info(
+                "Bulk-loaded {} SST files directly to level {}",
+                targetLevelFiles.size(),
+                targetLevel);
+    }
+
     // 
-------------------------------------------------------------------------
     //  Read Operations
     // 
-------------------------------------------------------------------------
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
index 52dbfd3d01..44d7d46ff5 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
@@ -28,7 +28,11 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -1321,6 +1325,126 @@ public class SimpleLsmKvDbTest {
         }
     }
 
+    @Test
+    public void testBulkLoad() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            // Prepare sorted entries
+            List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+            for (int i = 0; i < 100; i++) {
+                String key = String.format("key-%05d", i);
+                String value = String.format("value-%05d", i);
+                entries.add(
+                        new AbstractMap.SimpleImmutableEntry<>(
+                                key.getBytes(UTF_8), value.getBytes(UTF_8)));
+            }
+
+            db.bulkLoad(entries.iterator());
+
+            // All data at deepest level, no L0 files
+            Assertions.assertEquals(0, db.getLevelFileCount(0));
+            
Assertions.assertTrue(db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1) > 0);
+
+            // All keys should be readable
+            for (int i = 0; i < 100; i++) {
+                String expected = String.format("value-%05d", i);
+                String actual = getString(db, String.format("key-%05d", i));
+                Assertions.assertEquals(expected, actual, "Mismatch at index " 
+ i);
+            }
+        }
+    }
+
+    @Test
+    public void testBulkLoadMultipleSstFiles() throws IOException {
+        // Use a small maxSstFileSize to force multiple SST files
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"bulk-multi-db"))
+                        .memTableFlushThreshold(1024)
+                        .maxSstFileSize(512)
+                        .blockSize(128)
+                        .level0FileNumCompactTrigger(4)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+            for (int i = 0; i < 200; i++) {
+                String key = String.format("key-%05d", i);
+                String value = String.format("value-%05d", i);
+                entries.add(
+                        new AbstractMap.SimpleImmutableEntry<>(
+                                key.getBytes(UTF_8), value.getBytes(UTF_8)));
+            }
+
+            db.bulkLoad(entries.iterator());
+
+            // Multiple SST files should be created at the deepest level
+            int deepestLevelFiles = 
db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1);
+            Assertions.assertTrue(
+                    deepestLevelFiles > 1,
+                    "Expected multiple SST files at deepest level, got " + 
deepestLevelFiles);
+            Assertions.assertEquals(0, db.getLevelFileCount(0));
+
+            // All keys should be readable
+            for (int i = 0; i < 200; i++) {
+                String expected = String.format("value-%05d", i);
+                String actual = getString(db, String.format("key-%05d", i));
+                Assertions.assertEquals(expected, actual, "Mismatch at index " 
+ i);
+            }
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testBulkLoadEmptyIterator() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            List<Map.Entry<byte[], byte[]>> empty = new ArrayList<>();
+            db.bulkLoad(empty.iterator());
+
+            Assertions.assertEquals(0, db.getSstFileCount());
+            Assertions.assertNull(getString(db, "any-key"));
+        }
+    }
+
+    @Test
+    public void testBulkLoadThenPutAndGet() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            // Bulk load initial data
+            List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+            for (int i = 0; i < 50; i++) {
+                String key = String.format("key-%05d", i);
+                String value = String.format("value-%05d", i);
+                entries.add(
+                        new AbstractMap.SimpleImmutableEntry<>(
+                                key.getBytes(UTF_8), value.getBytes(UTF_8)));
+            }
+            db.bulkLoad(entries.iterator());
+
+            // Now use normal put to add/overwrite data
+            putString(db, "key-00000", "overwritten");
+            putString(db, "key-99999", "new-key");
+
+            Assertions.assertEquals("overwritten", getString(db, "key-00000"));
+            Assertions.assertEquals("new-key", getString(db, "key-99999"));
+            Assertions.assertEquals("value-00025", getString(db, 
String.format("key-%05d", 25)));
+        }
+    }
+
+    @Test
+    public void testBulkLoadFailsOnNonEmptyDb() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "existing", "data");
+
+            List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+            entries.add(
+                    new AbstractMap.SimpleImmutableEntry<>(
+                            "key".getBytes(UTF_8), "value".getBytes(UTF_8)));
+
+            Assertions.assertThrows(
+                    IllegalStateException.class, () -> 
db.bulkLoad(entries.iterator()));
+        }
+    }
+
     private static void putString(SimpleLsmKvDb db, String key, String value) 
throws IOException {
         db.put(key.getBytes(UTF_8), value.getBytes(UTF_8));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 8c8806daeb..9e925d0764 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -27,57 +27,29 @@ import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactFutureManager;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compact.CompactTask;
-import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.CompressOptions;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
-import org.apache.paimon.disk.ChannelReaderInputView;
-import org.apache.paimon.disk.ChannelReaderInputViewIterator;
-import org.apache.paimon.disk.ChannelWithMeta;
-import org.apache.paimon.disk.ChannelWriterOutputView;
-import org.apache.paimon.disk.FileChannelUtil;
-import org.apache.paimon.disk.FileIOChannel;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReader.RecordIterator;
-import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.CloseableIterator;
-import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
-import org.apache.paimon.utils.MutableObjectIterator;
 
 import javax.annotation.Nullable;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
-import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
-import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
 
 /**
  * Key Value clustering compact manager for {@link KeyValueFileStore}.
@@ -96,27 +68,14 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
 
     private final RowType keyType;
     private final RowType valueType;
-    private final long sortSpillBufferSize;
-    private final int pageSize;
-    private final int maxNumFileHandles;
-    private final int spillThreshold;
-    private final CompressOptions compression;
-    private final int[] clusteringColumns;
-    private final RecordComparator clusteringComparatorAlone;
-    private final RecordComparator clusteringComparatorInValue;
-    private final IOManager ioManager;
-    private final KeyValueFileReaderFactory keyReaderFactory;
-    private final KeyValueFileReaderFactory valueReaderFactory;
-    private final KeyValueFileWriterFactory writerFactory;
     private final ExecutorService executor;
     private final BucketedDvMaintainer dvMaintainer;
-    private final SimpleLsmKvDb kvDb;
     private final boolean lazyGenDeletionFile;
-    private final boolean firstRow;
     @Nullable private final CompactionMetrics.Reporter metricsReporter;
 
     private final ClusteringFiles fileLevels;
-    private final long targetFileSize;
+    private final ClusteringKeyIndex keyIndex;
+    private final ClusteringFileRewriter fileRewriter;
 
     public ClusteringCompactManager(
             RowType keyType,
@@ -139,78 +98,64 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
             CompressOptions compression,
             boolean firstRow,
             @Nullable CompactionMetrics.Reporter metricsReporter) {
-        this.targetFileSize = targetFileSize;
         this.keyType = keyType;
         this.valueType = valueType;
-        this.sortSpillBufferSize = sortSpillBufferSize;
-        this.pageSize = pageSize;
-        this.maxNumFileHandles = maxNumFileHandles;
-        this.spillThreshold = spillThreshold;
-        this.compression = compression;
-        this.firstRow = firstRow;
-        this.clusteringColumns = valueType.projectIndexes(clusteringColumns);
-        this.clusteringComparatorAlone =
-                CodeGenUtils.newRecordComparator(
-                        valueType.project(clusteringColumns).getFieldTypes(),
-                        IntStream.range(0, clusteringColumns.size()).toArray(),
-                        true);
-        this.clusteringComparatorInValue =
-                CodeGenUtils.newRecordComparator(
-                        valueType.getFieldTypes(), this.clusteringColumns, 
true);
-        this.ioManager = ioManager;
-        this.keyReaderFactory = keyReaderFactory;
-        this.valueReaderFactory = valueReaderFactory;
-        this.writerFactory = writerFactory;
         this.executor = executor;
         this.dvMaintainer = dvMaintainer;
         this.lazyGenDeletionFile = lazyGenDeletionFile;
         this.metricsReporter = metricsReporter;
+
         this.fileLevels = new ClusteringFiles();
         restoreFiles.forEach(this::addNewFile);
 
-        this.kvDb =
+        int[] clusteringColumnIndexes = 
valueType.projectIndexes(clusteringColumns);
+        RecordComparator clusteringComparatorAlone =
+                CodeGenUtils.newRecordComparator(
+                        valueType.project(clusteringColumns).getFieldTypes(),
+                        IntStream.range(0, clusteringColumns.size()).toArray(),
+                        true);
+        RecordComparator clusteringComparatorInValue =
+                CodeGenUtils.newRecordComparator(
+                        valueType.getFieldTypes(), clusteringColumnIndexes, 
true);
+
+        SimpleLsmKvDb kvDb =
                 SimpleLsmKvDb.builder(new File(ioManager.pickRandomTempDir()))
                         .cacheManager(cacheManager)
                         .keyComparator(new 
RowCompactedSerializer(keyType).createSliceComparator())
                         .build();
-        bootstrapKeyIndex(restoreFiles);
-    }
-
-    private void bootstrapKeyIndex(List<DataFileMeta> restoreFiles) {
-        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
-        for (DataFileMeta file : restoreFiles) {
-            if (file.level() == 0) {
-                continue;
-            }
-            int fileId = fileLevels.getFileIdByName(file.fileName());
-            // Read with DV (auto-skips deleted rows). Use 
FileRecordIterator.returnedPosition()
-            // to get correct physical positions even after DV filtering.
-            try (RecordReader<KeyValue> reader = 
keyReaderFactory.createRecordReader(file)) {
-                FileRecordIterator<KeyValue> batch;
-                while ((batch = (FileRecordIterator<KeyValue>) 
reader.readBatch()) != null) {
-                    KeyValue kv;
-                    while ((kv = batch.next()) != null) {
-                        int position = (int) batch.returnedPosition();
-                        byte[] keyBytes = 
keySerializer.serializeToBytes(kv.key());
-                        ByteArrayOutputStream value = new 
ByteArrayOutputStream(8);
-                        encodeInt(value, fileId);
-                        encodeInt(value, position);
-                        kvDb.put(keyBytes, value.toByteArray());
-                    }
-                    batch.releaseBatch();
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
 
-    private CloseableIterator<InternalRow> readKeyIterator(DataFileMeta file) 
throws IOException {
-        //noinspection resource
-        return keyReaderFactory
-                .createRecordReader(file)
-                .transform(KeyValue::key)
-                .toCloseableIterator();
+        this.keyIndex =
+                new ClusteringKeyIndex(
+                        keyType,
+                        ioManager,
+                        keyReaderFactory,
+                        dvMaintainer,
+                        kvDb,
+                        fileLevels,
+                        firstRow,
+                        sortSpillBufferSize,
+                        pageSize,
+                        maxNumFileHandles,
+                        compression);
+        keyIndex.bootstrap(restoreFiles);
+
+        this.fileRewriter =
+                new ClusteringFileRewriter(
+                        keyType,
+                        valueType,
+                        clusteringColumnIndexes,
+                        clusteringComparatorAlone,
+                        clusteringComparatorInValue,
+                        ioManager,
+                        valueReaderFactory,
+                        writerFactory,
+                        fileLevels,
+                        targetFileSize,
+                        sortSpillBufferSize,
+                        pageSize,
+                        maxNumFileHandles,
+                        spillThreshold,
+                        compression);
     }
 
     @Override
@@ -246,7 +191,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
     }
 
     private CompactResult compact(boolean fullCompaction) throws Exception {
-        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
         KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
         RowType kvSchemaType = KeyValue.schema(keyType, valueType);
 
@@ -258,25 +202,32 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
         List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
         for (DataFileMeta file : unsortedFiles) {
             List<DataFileMeta> sortedFiles =
-                    sortAndRewriteFiles(singletonList(file), kvSerializer, 
kvSchemaType);
-            updateKeyIndex(keySerializer, file, sortedFiles);
+                    fileRewriter.sortAndRewriteFiles(
+                            singletonList(file), kvSerializer, kvSchemaType);
+            keyIndex.updateIndex(file, sortedFiles);
             result.before().add(file);
             result.after().addAll(sortedFiles);
         }
 
         // Phase 2: Universal Compaction on sorted files that existed before 
Phase 1.
-        // Files produced by Phase 1 are excluded to avoid the same file 
appearing in both
-        // result.before() and result.after().
         List<List<DataFileMeta>> mergeGroups;
         if (fullCompaction) {
             mergeGroups = singletonList(existingSortedFiles);
         } else {
-            mergeGroups = pickMergeCandidates(existingSortedFiles);
+            mergeGroups = 
fileRewriter.pickMergeCandidates(existingSortedFiles);
         }
 
         for (List<DataFileMeta> mergeGroup : mergeGroups) {
             if (mergeGroup.size() >= 2) {
-                List<DataFileMeta> mergedFiles = 
mergeAndRewriteFiles(mergeGroup, keySerializer);
+                // Delete key index entries before merge
+                for (DataFileMeta file : mergeGroup) {
+                    keyIndex.deleteIndex(file);
+                }
+                List<DataFileMeta> mergedFiles = 
fileRewriter.mergeAndRewriteFiles(mergeGroup);
+                // Rebuild key index for new files
+                for (DataFileMeta newFile : mergedFiles) {
+                    keyIndex.rebuildIndex(newFile);
+                }
                 result.before().addAll(mergeGroup);
                 result.after().addAll(mergedFiles);
             }
@@ -290,494 +241,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
         return result;
     }
 
-    /**
-     * Pick merge candidate groups based on clustering column range overlap 
and file sizes.
-     *
-     * <ol>
-     *   <li><b>Group into sections</b>: Files are sorted by minKey and 
grouped into sections based
-     *       on clustering column key range overlap. Overlapping files belong 
to the same section.
-     *   <li><b>Merge adjacent sections</b>: Sections that have overlapping 
files (size &gt;= 2) or
-     *       are small (total size &lt; targetFileSize/2) are accumulated 
together. Large
-     *       single-file sections act as barriers, flushing accumulated files 
into a merge group.
-     * </ol>
-     *
-     * @param sortedFiles all sorted files
-     * @return list of merge groups; each group contains files to merge 
together
-     */
-    private List<List<DataFileMeta>> pickMergeCandidates(List<DataFileMeta> 
sortedFiles) {
-        if (sortedFiles.size() < 2) {
-            return java.util.Collections.emptyList();
-        }
-
-        // Step 1: Group files into sections based on clustering column range 
overlap.
-        List<List<DataFileMeta>> sections = groupIntoSections(sortedFiles);
-
-        // Step 2: Merge adjacent sections when beneficial to reduce small 
files.
-        // A section should be merged if it has overlapping files (size >= 2) 
or is small.
-        long smallSectionThreshold = targetFileSize / 2;
-        List<List<DataFileMeta>> mergeGroups = new ArrayList<>();
-        List<DataFileMeta> pending = new ArrayList<>();
-
-        for (List<DataFileMeta> section : sections) {
-            boolean needsMerge = section.size() >= 2;
-            boolean isSmall = sectionSize(section) < smallSectionThreshold;
-
-            if (needsMerge || isSmall) {
-                // This section should be merged, accumulate it
-                pending.addAll(section);
-            } else {
-                // This section is a single large file, flush pending if any
-                if (pending.size() >= 2) {
-                    mergeGroups.add(new ArrayList<>(pending));
-                }
-                pending.clear();
-            }
-        }
-
-        // Flush remaining pending files
-        if (pending.size() >= 2) {
-            mergeGroups.add(pending);
-        }
-
-        return mergeGroups;
-    }
-
-    private long sectionSize(List<DataFileMeta> section) {
-        long total = 0;
-        for (DataFileMeta file : section) {
-            total += file.fileSize();
-        }
-        return total;
-    }
-
-    /**
-     * Group files into sections based on clustering column key range overlap. 
Files are first
-     * sorted by minKey, then adjacent files with overlapping ranges are 
grouped into the same
-     * section.
-     *
-     * @param files input files
-     * @return list of sections, each section contains overlapping files
-     */
-    private List<List<DataFileMeta>> groupIntoSections(List<DataFileMeta> 
files) {
-        // Sort files by minKey to properly detect overlapping ranges
-        List<DataFileMeta> sorted = new ArrayList<>(files);
-        sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(), 
b.minKey()));
-
-        List<List<DataFileMeta>> sections = new ArrayList<>();
-        List<DataFileMeta> currentSection = new ArrayList<>();
-        currentSection.add(sorted.get(0));
-        BinaryRow currentMax = sorted.get(0).maxKey();
-
-        for (int i = 1; i < sorted.size(); i++) {
-            DataFileMeta file = sorted.get(i);
-            if (clusteringComparatorAlone.compare(currentMax, file.minKey()) 
>= 0) {
-                // Overlaps with current section
-                currentSection.add(file);
-                if (clusteringComparatorAlone.compare(file.maxKey(), 
currentMax) > 0) {
-                    currentMax = file.maxKey();
-                }
-            } else {
-                sections.add(currentSection);
-                currentSection = new ArrayList<>();
-                currentSection.add(file);
-                currentMax = file.maxKey();
-            }
-        }
-        sections.add(currentSection);
-        return sections;
-    }
-
-    /**
-     * Update the key index for a single original file replaced by new sorted 
files. Marks old key
-     * positions in deletion vectors and registers new positions.
-     */
-    private void updateKeyIndex(
-            RowCompactedSerializer keySerializer,
-            DataFileMeta originalFile,
-            List<DataFileMeta> newSortedFiles)
-            throws Exception {
-        updateKeyIndex(keySerializer, singletonList(originalFile), 
newSortedFiles);
-    }
-
-    /**
-     * Update the key index for multiple original files replaced by new sorted 
files.
-     *
-     * <p>For DEDUPLICATE mode: mark the old position in deletion vectors, 
keep the new position.
-     *
-     * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion 
vectors (keep the
-     * first/old one); if key is new, store the new position.
-     */
-    private void updateKeyIndex(
-            RowCompactedSerializer keySerializer,
-            List<DataFileMeta> originalFiles,
-            List<DataFileMeta> newSortedFiles)
-            throws Exception {
-        // Collect file names of original files to avoid self-deletion marking
-        java.util.Set<String> originalFileNames = new java.util.HashSet<>();
-        for (DataFileMeta file : originalFiles) {
-            originalFileNames.add(file.fileName());
-        }
-
-        for (DataFileMeta sortedFile : newSortedFiles) {
-            int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
-            int position = 0;
-            try (CloseableIterator<InternalRow> iterator = 
readKeyIterator(sortedFile)) {
-                while (iterator.hasNext()) {
-                    byte[] key = 
keySerializer.serializeToBytes(iterator.next());
-                    byte[] oldValue = kvDb.get(key);
-                    if (oldValue != null) {
-                        ByteArrayInputStream valueIn = new 
ByteArrayInputStream(oldValue);
-                        int oldFileId = decodeInt(valueIn);
-                        int oldPosition = decodeInt(valueIn);
-                        DataFileMeta oldFile = 
fileLevels.getFileById(oldFileId);
-                        if (oldFile != null && 
!originalFileNames.contains(oldFile.fileName())) {
-                            if (firstRow) {
-                                // First-row mode: keep the old (first) 
record, delete the new one
-                                
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
-                                position++;
-                                continue;
-                            } else {
-                                // Deduplicate mode: keep the new record, 
delete the old one
-                                
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
-                            }
-                        }
-                    }
-                    ByteArrayOutputStream value = new ByteArrayOutputStream(8);
-                    encodeInt(value, fileId);
-                    encodeInt(value, position);
-                    kvDb.put(key, value.toByteArray());
-                    position++;
-                }
-            }
-        }
-    }
-
-    /**
-     * Sort and rewrite one or more unsorted files by clustering columns. 
Reads all KeyValue records
-     * from the input files, sorts them using an external sort buffer, and 
writes to new level-1
-     * files.
-     */
-    private List<DataFileMeta> sortAndRewriteFiles(
-            List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer, 
RowType kvSchemaType)
-            throws Exception {
-        int[] sortFieldsInKeyValue =
-                Arrays.stream(clusteringColumns)
-                        .map(i -> i + keyType.getFieldCount() + 2)
-                        .toArray();
-        BinaryExternalSortBuffer sortBuffer =
-                BinaryExternalSortBuffer.create(
-                        ioManager,
-                        kvSchemaType,
-                        sortFieldsInKeyValue,
-                        sortSpillBufferSize,
-                        pageSize,
-                        maxNumFileHandles,
-                        compression,
-                        MemorySize.MAX_VALUE,
-                        false);
-
-        for (DataFileMeta file : inputFiles) {
-            try (RecordReader<KeyValue> reader = 
valueReaderFactory.createRecordReader(file)) {
-                try (CloseableIterator<KeyValue> iterator = 
reader.toCloseableIterator()) {
-                    while (iterator.hasNext()) {
-                        KeyValue kv = iterator.next();
-                        InternalRow serializedRow = kvSerializer.toRow(kv);
-                        sortBuffer.write(serializedRow);
-                    }
-                }
-            }
-        }
-
-        RollingFileWriter<KeyValue, DataFileMeta> writer =
-                writerFactory.createRollingClusteringFileWriter();
-        try {
-            MutableObjectIterator<BinaryRow> sortedIterator = 
sortBuffer.sortedIterator();
-            BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount());
-            while ((binaryRow = sortedIterator.next(binaryRow)) != null) {
-                KeyValue kv = kvSerializer.fromRow(binaryRow);
-                writer.write(
-                        kv.copy(
-                                new InternalRowSerializer(keyType),
-                                new InternalRowSerializer(valueType)));
-            }
-        } finally {
-            sortBuffer.clear();
-            writer.close();
-        }
-
-        List<DataFileMeta> newFiles = writer.result();
-        for (DataFileMeta file : inputFiles) {
-            fileLevels.removeFile(file);
-        }
-        for (DataFileMeta newFile : newFiles) {
-            fileLevels.addNewFile(newFile);
-        }
-
-        return newFiles;
-    }
-
-    /**
-     * Merge sorted files using min-heap based multi-way merge. Since all 
input files are already
-     * sorted by clustering columns, we use a PriorityQueue to merge them 
efficiently without
-     * re-sorting. Key index entries are deleted during reading and rebuilt 
after writing.
-     *
-     * <p>When the number of input files exceeds spillThreshold, smaller files 
are spilled to
-     * row-based temp files first. Row-based iterators consume much less 
memory than columnar file
-     * readers.
-     */
-    private List<DataFileMeta> mergeAndRewriteFiles(
-            List<DataFileMeta> inputFiles, RowCompactedSerializer 
keySerializer) throws Exception {
-        InternalRowSerializer keyRowSerializer = new 
InternalRowSerializer(keyType);
-        InternalRowSerializer valueRowSerializer = new 
InternalRowSerializer(valueType);
-
-        // Delete key index entries for all input files before reading
-        for (DataFileMeta file : inputFiles) {
-            deleteKeyIndexForFile(keySerializer, file);
-        }
-
-        // Determine which files to spill to row-based temp files
-        List<DataFileMeta> filesToSpill = new ArrayList<>();
-        List<DataFileMeta> filesToKeep = new ArrayList<>();
-        if (inputFiles.size() > spillThreshold) {
-            List<DataFileMeta> sortedBySize = new ArrayList<>(inputFiles);
-            
sortedBySize.sort(Comparator.comparingLong(DataFileMeta::fileSize));
-            int spillCount = inputFiles.size() - spillThreshold;
-            filesToSpill = new ArrayList<>(sortedBySize.subList(0, 
spillCount));
-            filesToKeep = new ArrayList<>(sortedBySize.subList(spillCount, 
sortedBySize.size()));
-        } else {
-            filesToKeep = inputFiles;
-        }
-
-        // Spill smaller files to row-based temp files
-        List<SpilledChannel> spilledChannels = new ArrayList<>();
-        for (DataFileMeta file : filesToSpill) {
-            spilledChannels.add(spillToRowBasedFile(file));
-        }
-
-        // Open iterators and initialize the min-heap
-        List<CloseableIterator<KeyValue>> openIterators = new ArrayList<>();
-        PriorityQueue<MergeEntry> minHeap =
-                new PriorityQueue<>(
-                        (a, b) ->
-                                clusteringComparatorInValue.compare(
-                                        a.currentKeyValue.value(), 
b.currentKeyValue.value()));
-
-        try {
-            // Add iterators for columnar files (kept in memory)
-            for (DataFileMeta file : filesToKeep) {
-                @SuppressWarnings("resource")
-                CloseableIterator<KeyValue> iterator =
-                        
valueReaderFactory.createRecordReader(file).toCloseableIterator();
-                openIterators.add(iterator);
-                if (iterator.hasNext()) {
-                    KeyValue firstKv = iterator.next().copy(keyRowSerializer, 
valueRowSerializer);
-                    minHeap.add(new MergeEntry(firstKv, iterator));
-                }
-            }
-
-            // Add iterators for row-based spilled files (low memory 
consumption)
-            for (SpilledChannel spilled : spilledChannels) {
-                CloseableIterator<KeyValue> iterator = 
spilled.createIterator();
-                openIterators.add(iterator);
-                if (iterator.hasNext()) {
-                    KeyValue firstKv = iterator.next().copy(keyRowSerializer, 
valueRowSerializer);
-                    minHeap.add(new MergeEntry(firstKv, iterator));
-                }
-            }
-
-            // Multi-way merge: write records in sorted order
-            RollingFileWriter<KeyValue, DataFileMeta> writer =
-                    writerFactory.createRollingClusteringFileWriter();
-            try {
-                while (!minHeap.isEmpty()) {
-                    MergeEntry entry = minHeap.poll();
-                    writer.write(entry.currentKeyValue);
-                    if (entry.iterator.hasNext()) {
-                        entry.currentKeyValue =
-                                entry.iterator.next().copy(keyRowSerializer, 
valueRowSerializer);
-                        minHeap.add(entry);
-                    }
-                }
-            } finally {
-                writer.close();
-            }
-
-            // Remove original files and register new sorted files
-            List<DataFileMeta> newFiles = writer.result();
-            for (DataFileMeta file : inputFiles) {
-                fileLevels.removeFile(file);
-            }
-            for (DataFileMeta newFile : newFiles) {
-                fileLevels.addNewFile(newFile);
-            }
-
-            // Rebuild key index for the new files
-            for (DataFileMeta newFile : newFiles) {
-                int fileId = fileLevels.getFileIdByName(newFile.fileName());
-                int position = 0;
-                try (CloseableIterator<InternalRow> keyIterator = 
readKeyIterator(newFile)) {
-                    while (keyIterator.hasNext()) {
-                        byte[] key = 
keySerializer.serializeToBytes(keyIterator.next());
-                        ByteArrayOutputStream value = new 
ByteArrayOutputStream(8);
-                        encodeInt(value, fileId);
-                        encodeInt(value, position);
-                        kvDb.put(key, value.toByteArray());
-                        position++;
-                    }
-                }
-            }
-
-            return newFiles;
-        } finally {
-            for (CloseableIterator<KeyValue> iterator : openIterators) {
-                try {
-                    iterator.close();
-                } catch (Exception ignored) {
-                }
-            }
-        }
-    }
-
-    /**
-     * Spill a columnar DataFileMeta to a row-based temp file. Row-based files 
consume much less
-     * memory when reading compared to columnar files.
-     */
-    private SpilledChannel spillToRowBasedFile(DataFileMeta file) throws 
Exception {
-        FileIOChannel.ID channel = ioManager.createChannel();
-        KeyValueWithLevelNoReusingSerializer serializer =
-                new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
-        BlockCompressionFactory compressFactory = 
BlockCompressionFactory.create(compression);
-        int compressBlock = (int) MemorySize.parse("64 kb").getBytes();
-
-        ChannelWithMeta channelWithMeta;
-        ChannelWriterOutputView out =
-                FileChannelUtil.createOutputView(
-                        ioManager, channel, compressFactory, compressBlock);
-        try (RecordReader<KeyValue> reader = 
valueReaderFactory.createRecordReader(file)) {
-            RecordIterator<KeyValue> batch;
-            KeyValue record;
-            while ((batch = reader.readBatch()) != null) {
-                while ((record = batch.next()) != null) {
-                    serializer.serialize(record, out);
-                }
-                batch.releaseBatch();
-            }
-        } finally {
-            out.close();
-            channelWithMeta =
-                    new ChannelWithMeta(channel, out.getBlockCount(), 
out.getWriteBytes());
-        }
-
-        return new SpilledChannel(channelWithMeta, compressFactory, 
compressBlock, serializer);
-    }
-
-    /** Holds metadata for a spilled row-based temp file. */
-    private class SpilledChannel {
-        private final ChannelWithMeta channel;
-        private final BlockCompressionFactory compressFactory;
-        private final int compressBlock;
-        private final KeyValueWithLevelNoReusingSerializer serializer;
-
-        SpilledChannel(
-                ChannelWithMeta channel,
-                BlockCompressionFactory compressFactory,
-                int compressBlock,
-                KeyValueWithLevelNoReusingSerializer serializer) {
-            this.channel = channel;
-            this.compressFactory = compressFactory;
-            this.compressBlock = compressBlock;
-            this.serializer = serializer;
-        }
-
-        CloseableIterator<KeyValue> createIterator() throws IOException {
-            ChannelReaderInputView view =
-                    FileChannelUtil.createInputView(
-                            ioManager, channel, new ArrayList<>(), 
compressFactory, compressBlock);
-            BinaryRowSerializer rowSerializer = new 
BinaryRowSerializer(serializer.numFields());
-            ChannelReaderInputViewIterator iterator =
-                    new ChannelReaderInputViewIterator(view, null, 
rowSerializer);
-            return new SpilledChannelIterator(view, iterator, serializer);
-        }
-    }
-
-    /** Iterator that reads KeyValue records from a spilled row-based temp 
file. */
-    private static class SpilledChannelIterator implements 
CloseableIterator<KeyValue> {
-        private final ChannelReaderInputView view;
-        private final ChannelReaderInputViewIterator iterator;
-        private final KeyValueWithLevelNoReusingSerializer serializer;
-        private KeyValue next;
-
-        SpilledChannelIterator(
-                ChannelReaderInputView view,
-                ChannelReaderInputViewIterator iterator,
-                KeyValueWithLevelNoReusingSerializer serializer) {
-            this.view = view;
-            this.iterator = iterator;
-            this.serializer = serializer;
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (next != null) {
-                return true;
-            }
-            try {
-                BinaryRow row = iterator.next();
-                if (row == null) {
-                    return false;
-                }
-                next = serializer.fromRow(row);
-                return true;
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public KeyValue next() {
-            if (!hasNext()) {
-                throw new java.util.NoSuchElementException();
-            }
-            KeyValue result = next;
-            next = null;
-            return result;
-        }
-
-        @Override
-        public void close() throws Exception {
-            view.getChannel().closeAndDelete();
-        }
-    }
-
-    /** Delete key index entries for the given file from kvDb (only if they 
still point to it). */
-    private void deleteKeyIndexForFile(RowCompactedSerializer keySerializer, 
DataFileMeta file)
-            throws Exception {
-        int fileId = fileLevels.getFileIdByName(file.fileName());
-        try (CloseableIterator<InternalRow> iterator = readKeyIterator(file)) {
-            while (iterator.hasNext()) {
-                byte[] key = keySerializer.serializeToBytes(iterator.next());
-                byte[] value = kvDb.get(key);
-                if (value != null) {
-                    int storedFileId = decodeInt(new 
ByteArrayInputStream(value));
-                    if (storedFileId == fileId) {
-                        kvDb.delete(key);
-                    }
-                }
-            }
-        }
-    }
-
-    /** Entry in the min-heap for multi-way merge, holding the current 
KeyValue and its iterator. */
-    private static class MergeEntry {
-        KeyValue currentKeyValue;
-        final CloseableIterator<KeyValue> iterator;
-
-        MergeEntry(KeyValue currentKeyValue, CloseableIterator<KeyValue> 
iterator) {
-            this.currentKeyValue = currentKeyValue;
-            this.iterator = iterator;
-        }
-    }
-
     @Override
     public Optional<CompactResult> getCompactionResult(boolean blocking)
             throws ExecutionException, InterruptedException {
@@ -791,6 +254,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
 
     @Override
     public void close() throws IOException {
-        kvDb.close();
+        keyIndex.close();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
similarity index 50%
copy from 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
copy to 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index 8c8806daeb..ece4813764 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -19,22 +19,14 @@
 package org.apache.paimon.mergetree.compact.clustering;
 
 import org.apache.paimon.KeyValue;
-import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.KeyValueSerializer;
-import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
-import org.apache.paimon.compact.CompactDeletionFile;
-import org.apache.paimon.compact.CompactFutureManager;
-import org.apache.paimon.compact.CompactResult;
-import org.apache.paimon.compact.CompactTask;
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.data.serializer.RowCompactedSerializer;
-import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.disk.ChannelReaderInputView;
 import org.apache.paimon.disk.ChannelReaderInputViewIterator;
 import org.apache.paimon.disk.ChannelWithMeta;
@@ -46,11 +38,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
-import org.apache.paimon.io.cache.CacheManager;
-import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb;
-import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReader.RecordIterator;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
@@ -59,405 +47,74 @@ import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
 import org.apache.paimon.utils.MutableObjectIterator;
 
-import javax.annotation.Nullable;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Optional;
 import java.util.PriorityQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.IntStream;
-
-import static java.util.Collections.singletonList;
-import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
-import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
 
 /**
- * Key Value clustering compact manager for {@link KeyValueFileStore}.
- *
- * <p>Compaction is triggered when unsorted files exist. The compaction 
process has two phases:
- *
- * <ol>
- *   <li><b>Phase 1</b>: Sort and rewrite all unsorted (level 0) files by 
clustering columns.
- *   <li><b>Phase 2</b>: Merge sorted files based on clustering column key 
range overlap. Files are
- *       grouped into sections where each section contains overlapping files. 
Adjacent sections are
- *       merged when beneficial (overlapping files or small sections) to 
reduce IO amplification
- *       while consolidating small files.
- * </ol>
+ * Handles file rewriting for clustering compaction, including sorting 
unsorted files (Phase 1) and
+ * merging sorted files via multi-way merge (Phase 2).
  */
-public class ClusteringCompactManager extends CompactFutureManager {
+public class ClusteringFileRewriter {
 
     private final RowType keyType;
     private final RowType valueType;
-    private final long sortSpillBufferSize;
-    private final int pageSize;
-    private final int maxNumFileHandles;
-    private final int spillThreshold;
-    private final CompressOptions compression;
     private final int[] clusteringColumns;
     private final RecordComparator clusteringComparatorAlone;
     private final RecordComparator clusteringComparatorInValue;
     private final IOManager ioManager;
-    private final KeyValueFileReaderFactory keyReaderFactory;
     private final KeyValueFileReaderFactory valueReaderFactory;
     private final KeyValueFileWriterFactory writerFactory;
-    private final ExecutorService executor;
-    private final BucketedDvMaintainer dvMaintainer;
-    private final SimpleLsmKvDb kvDb;
-    private final boolean lazyGenDeletionFile;
-    private final boolean firstRow;
-    @Nullable private final CompactionMetrics.Reporter metricsReporter;
-
     private final ClusteringFiles fileLevels;
     private final long targetFileSize;
+    private final long sortSpillBufferSize;
+    private final int pageSize;
+    private final int maxNumFileHandles;
+    private final int spillThreshold;
+    private final CompressOptions compression;
 
-    public ClusteringCompactManager(
+    public ClusteringFileRewriter(
             RowType keyType,
             RowType valueType,
-            List<String> clusteringColumns,
+            int[] clusteringColumns,
+            RecordComparator clusteringComparatorAlone,
+            RecordComparator clusteringComparatorInValue,
             IOManager ioManager,
-            CacheManager cacheManager,
-            KeyValueFileReaderFactory keyReaderFactory,
             KeyValueFileReaderFactory valueReaderFactory,
             KeyValueFileWriterFactory writerFactory,
-            ExecutorService executor,
-            BucketedDvMaintainer dvMaintainer,
-            boolean lazyGenDeletionFile,
-            List<DataFileMeta> restoreFiles,
+            ClusteringFiles fileLevels,
             long targetFileSize,
             long sortSpillBufferSize,
             int pageSize,
             int maxNumFileHandles,
             int spillThreshold,
-            CompressOptions compression,
-            boolean firstRow,
-            @Nullable CompactionMetrics.Reporter metricsReporter) {
-        this.targetFileSize = targetFileSize;
+            CompressOptions compression) {
         this.keyType = keyType;
         this.valueType = valueType;
+        this.clusteringColumns = clusteringColumns;
+        this.clusteringComparatorAlone = clusteringComparatorAlone;
+        this.clusteringComparatorInValue = clusteringComparatorInValue;
+        this.ioManager = ioManager;
+        this.valueReaderFactory = valueReaderFactory;
+        this.writerFactory = writerFactory;
+        this.fileLevels = fileLevels;
+        this.targetFileSize = targetFileSize;
         this.sortSpillBufferSize = sortSpillBufferSize;
         this.pageSize = pageSize;
         this.maxNumFileHandles = maxNumFileHandles;
         this.spillThreshold = spillThreshold;
         this.compression = compression;
-        this.firstRow = firstRow;
-        this.clusteringColumns = valueType.projectIndexes(clusteringColumns);
-        this.clusteringComparatorAlone =
-                CodeGenUtils.newRecordComparator(
-                        valueType.project(clusteringColumns).getFieldTypes(),
-                        IntStream.range(0, clusteringColumns.size()).toArray(),
-                        true);
-        this.clusteringComparatorInValue =
-                CodeGenUtils.newRecordComparator(
-                        valueType.getFieldTypes(), this.clusteringColumns, 
true);
-        this.ioManager = ioManager;
-        this.keyReaderFactory = keyReaderFactory;
-        this.valueReaderFactory = valueReaderFactory;
-        this.writerFactory = writerFactory;
-        this.executor = executor;
-        this.dvMaintainer = dvMaintainer;
-        this.lazyGenDeletionFile = lazyGenDeletionFile;
-        this.metricsReporter = metricsReporter;
-        this.fileLevels = new ClusteringFiles();
-        restoreFiles.forEach(this::addNewFile);
-
-        this.kvDb =
-                SimpleLsmKvDb.builder(new File(ioManager.pickRandomTempDir()))
-                        .cacheManager(cacheManager)
-                        .keyComparator(new 
RowCompactedSerializer(keyType).createSliceComparator())
-                        .build();
-        bootstrapKeyIndex(restoreFiles);
-    }
-
-    private void bootstrapKeyIndex(List<DataFileMeta> restoreFiles) {
-        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
-        for (DataFileMeta file : restoreFiles) {
-            if (file.level() == 0) {
-                continue;
-            }
-            int fileId = fileLevels.getFileIdByName(file.fileName());
-            // Read with DV (auto-skips deleted rows). Use 
FileRecordIterator.returnedPosition()
-            // to get correct physical positions even after DV filtering.
-            try (RecordReader<KeyValue> reader = 
keyReaderFactory.createRecordReader(file)) {
-                FileRecordIterator<KeyValue> batch;
-                while ((batch = (FileRecordIterator<KeyValue>) 
reader.readBatch()) != null) {
-                    KeyValue kv;
-                    while ((kv = batch.next()) != null) {
-                        int position = (int) batch.returnedPosition();
-                        byte[] keyBytes = 
keySerializer.serializeToBytes(kv.key());
-                        ByteArrayOutputStream value = new 
ByteArrayOutputStream(8);
-                        encodeInt(value, fileId);
-                        encodeInt(value, position);
-                        kvDb.put(keyBytes, value.toByteArray());
-                    }
-                    batch.releaseBatch();
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private CloseableIterator<InternalRow> readKeyIterator(DataFileMeta file) 
throws IOException {
-        //noinspection resource
-        return keyReaderFactory
-                .createRecordReader(file)
-                .transform(KeyValue::key)
-                .toCloseableIterator();
-    }
-
-    @Override
-    public boolean shouldWaitForLatestCompaction() {
-        return false;
-    }
-
-    @Override
-    public boolean shouldWaitForPreparingCheckpoint() {
-        return false;
-    }
-
-    @Override
-    public void addNewFile(DataFileMeta file) {
-        fileLevels.addNewFile(file);
-    }
-
-    @Override
-    public List<DataFileMeta> allFiles() {
-        return fileLevels.allFiles();
-    }
-
-    @Override
-    public void triggerCompaction(boolean fullCompaction) {
-        taskFuture =
-                executor.submit(
-                        new CompactTask(metricsReporter) {
-                            @Override
-                            protected CompactResult doCompact() throws 
Exception {
-                                return compact(fullCompaction);
-                            }
-                        });
-    }
-
-    private CompactResult compact(boolean fullCompaction) throws Exception {
-        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
-        KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
-        RowType kvSchemaType = KeyValue.schema(keyType, valueType);
-
-        CompactResult result = new CompactResult();
-
-        // Phase 1: Sort and rewrite all unsorted (level 0) files
-        List<DataFileMeta> unsortedFiles = fileLevels.unsortedFiles();
-        // Snapshot sorted files before Phase 1 to avoid including newly 
created files in Phase 2
-        List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
-        for (DataFileMeta file : unsortedFiles) {
-            List<DataFileMeta> sortedFiles =
-                    sortAndRewriteFiles(singletonList(file), kvSerializer, 
kvSchemaType);
-            updateKeyIndex(keySerializer, file, sortedFiles);
-            result.before().add(file);
-            result.after().addAll(sortedFiles);
-        }
-
-        // Phase 2: Universal Compaction on sorted files that existed before 
Phase 1.
-        // Files produced by Phase 1 are excluded to avoid the same file 
appearing in both
-        // result.before() and result.after().
-        List<List<DataFileMeta>> mergeGroups;
-        if (fullCompaction) {
-            mergeGroups = singletonList(existingSortedFiles);
-        } else {
-            mergeGroups = pickMergeCandidates(existingSortedFiles);
-        }
-
-        for (List<DataFileMeta> mergeGroup : mergeGroups) {
-            if (mergeGroup.size() >= 2) {
-                List<DataFileMeta> mergedFiles = 
mergeAndRewriteFiles(mergeGroup, keySerializer);
-                result.before().addAll(mergeGroup);
-                result.after().addAll(mergedFiles);
-            }
-        }
-
-        CompactDeletionFile deletionFile =
-                lazyGenDeletionFile
-                        ? CompactDeletionFile.lazyGeneration(dvMaintainer)
-                        : CompactDeletionFile.generateFiles(dvMaintainer);
-        result.setDeletionFile(deletionFile);
-        return result;
-    }
-
-    /**
-     * Pick merge candidate groups based on clustering column range overlap 
and file sizes.
-     *
-     * <ol>
-     *   <li><b>Group into sections</b>: Files are sorted by minKey and 
grouped into sections based
-     *       on clustering column key range overlap. Overlapping files belong 
to the same section.
-     *   <li><b>Merge adjacent sections</b>: Sections that have overlapping 
files (size &gt;= 2) or
-     *       are small (total size &lt; targetFileSize/2) are accumulated 
together. Large
-     *       single-file sections act as barriers, flushing accumulated files 
into a merge group.
-     * </ol>
-     *
-     * @param sortedFiles all sorted files
-     * @return list of merge groups; each group contains files to merge 
together
-     */
-    private List<List<DataFileMeta>> pickMergeCandidates(List<DataFileMeta> 
sortedFiles) {
-        if (sortedFiles.size() < 2) {
-            return java.util.Collections.emptyList();
-        }
-
-        // Step 1: Group files into sections based on clustering column range 
overlap.
-        List<List<DataFileMeta>> sections = groupIntoSections(sortedFiles);
-
-        // Step 2: Merge adjacent sections when beneficial to reduce small 
files.
-        // A section should be merged if it has overlapping files (size >= 2) 
or is small.
-        long smallSectionThreshold = targetFileSize / 2;
-        List<List<DataFileMeta>> mergeGroups = new ArrayList<>();
-        List<DataFileMeta> pending = new ArrayList<>();
-
-        for (List<DataFileMeta> section : sections) {
-            boolean needsMerge = section.size() >= 2;
-            boolean isSmall = sectionSize(section) < smallSectionThreshold;
-
-            if (needsMerge || isSmall) {
-                // This section should be merged, accumulate it
-                pending.addAll(section);
-            } else {
-                // This section is a single large file, flush pending if any
-                if (pending.size() >= 2) {
-                    mergeGroups.add(new ArrayList<>(pending));
-                }
-                pending.clear();
-            }
-        }
-
-        // Flush remaining pending files
-        if (pending.size() >= 2) {
-            mergeGroups.add(pending);
-        }
-
-        return mergeGroups;
-    }
-
-    private long sectionSize(List<DataFileMeta> section) {
-        long total = 0;
-        for (DataFileMeta file : section) {
-            total += file.fileSize();
-        }
-        return total;
     }
 
     /**
-     * Group files into sections based on clustering column key range overlap. 
Files are first
-     * sorted by minKey, then adjacent files with overlapping ranges are 
grouped into the same
-     * section.
-     *
-     * @param files input files
-     * @return list of sections, each section contains overlapping files
+     * Sort and rewrite unsorted files by clustering columns. Reads all 
KeyValue records, sorts them
+     * using an external sort buffer, and writes to new level-1 files.
      */
-    private List<List<DataFileMeta>> groupIntoSections(List<DataFileMeta> 
files) {
-        // Sort files by minKey to properly detect overlapping ranges
-        List<DataFileMeta> sorted = new ArrayList<>(files);
-        sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(), 
b.minKey()));
-
-        List<List<DataFileMeta>> sections = new ArrayList<>();
-        List<DataFileMeta> currentSection = new ArrayList<>();
-        currentSection.add(sorted.get(0));
-        BinaryRow currentMax = sorted.get(0).maxKey();
-
-        for (int i = 1; i < sorted.size(); i++) {
-            DataFileMeta file = sorted.get(i);
-            if (clusteringComparatorAlone.compare(currentMax, file.minKey()) 
>= 0) {
-                // Overlaps with current section
-                currentSection.add(file);
-                if (clusteringComparatorAlone.compare(file.maxKey(), 
currentMax) > 0) {
-                    currentMax = file.maxKey();
-                }
-            } else {
-                sections.add(currentSection);
-                currentSection = new ArrayList<>();
-                currentSection.add(file);
-                currentMax = file.maxKey();
-            }
-        }
-        sections.add(currentSection);
-        return sections;
-    }
-
-    /**
-     * Update the key index for a single original file replaced by new sorted 
files. Marks old key
-     * positions in deletion vectors and registers new positions.
-     */
-    private void updateKeyIndex(
-            RowCompactedSerializer keySerializer,
-            DataFileMeta originalFile,
-            List<DataFileMeta> newSortedFiles)
-            throws Exception {
-        updateKeyIndex(keySerializer, singletonList(originalFile), 
newSortedFiles);
-    }
-
-    /**
-     * Update the key index for multiple original files replaced by new sorted 
files.
-     *
-     * <p>For DEDUPLICATE mode: mark the old position in deletion vectors, 
keep the new position.
-     *
-     * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion 
vectors (keep the
-     * first/old one); if key is new, store the new position.
-     */
-    private void updateKeyIndex(
-            RowCompactedSerializer keySerializer,
-            List<DataFileMeta> originalFiles,
-            List<DataFileMeta> newSortedFiles)
-            throws Exception {
-        // Collect file names of original files to avoid self-deletion marking
-        java.util.Set<String> originalFileNames = new java.util.HashSet<>();
-        for (DataFileMeta file : originalFiles) {
-            originalFileNames.add(file.fileName());
-        }
-
-        for (DataFileMeta sortedFile : newSortedFiles) {
-            int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
-            int position = 0;
-            try (CloseableIterator<InternalRow> iterator = 
readKeyIterator(sortedFile)) {
-                while (iterator.hasNext()) {
-                    byte[] key = 
keySerializer.serializeToBytes(iterator.next());
-                    byte[] oldValue = kvDb.get(key);
-                    if (oldValue != null) {
-                        ByteArrayInputStream valueIn = new 
ByteArrayInputStream(oldValue);
-                        int oldFileId = decodeInt(valueIn);
-                        int oldPosition = decodeInt(valueIn);
-                        DataFileMeta oldFile = 
fileLevels.getFileById(oldFileId);
-                        if (oldFile != null && 
!originalFileNames.contains(oldFile.fileName())) {
-                            if (firstRow) {
-                                // First-row mode: keep the old (first) 
record, delete the new one
-                                
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
-                                position++;
-                                continue;
-                            } else {
-                                // Deduplicate mode: keep the new record, 
delete the old one
-                                
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
-                            }
-                        }
-                    }
-                    ByteArrayOutputStream value = new ByteArrayOutputStream(8);
-                    encodeInt(value, fileId);
-                    encodeInt(value, position);
-                    kvDb.put(key, value.toByteArray());
-                    position++;
-                }
-            }
-        }
-    }
-
-    /**
-     * Sort and rewrite one or more unsorted files by clustering columns. 
Reads all KeyValue records
-     * from the input files, sorts them using an external sort buffer, and 
writes to new level-1
-     * files.
-     */
-    private List<DataFileMeta> sortAndRewriteFiles(
+    public List<DataFileMeta> sortAndRewriteFiles(
             List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer, 
RowType kvSchemaType)
             throws Exception {
         int[] sortFieldsInKeyValue =
@@ -517,27 +174,54 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
     }
 
     /**
-     * Merge sorted files using min-heap based multi-way merge. Since all 
input files are already
-     * sorted by clustering columns, we use a PriorityQueue to merge them 
efficiently without
-     * re-sorting. Key index entries are deleted during reading and rebuilt 
after writing.
+     * Pick merge candidate groups based on clustering column range overlap 
and file sizes.
      *
-     * <p>When the number of input files exceeds spillThreshold, smaller files 
are spilled to
-     * row-based temp files first. Row-based iterators consume much less 
memory than columnar file
-     * readers.
+     * @param sortedFiles all sorted files
+     * @return list of merge groups; each group contains files to merge 
together
      */
-    private List<DataFileMeta> mergeAndRewriteFiles(
-            List<DataFileMeta> inputFiles, RowCompactedSerializer 
keySerializer) throws Exception {
-        InternalRowSerializer keyRowSerializer = new 
InternalRowSerializer(keyType);
-        InternalRowSerializer valueRowSerializer = new 
InternalRowSerializer(valueType);
+    public List<List<DataFileMeta>> pickMergeCandidates(List<DataFileMeta> 
sortedFiles) {
+        if (sortedFiles.size() < 2) {
+            return Collections.emptyList();
+        }
 
-        // Delete key index entries for all input files before reading
-        for (DataFileMeta file : inputFiles) {
-            deleteKeyIndexForFile(keySerializer, file);
+        List<List<DataFileMeta>> sections = groupIntoSections(sortedFiles);
+
+        long smallSectionThreshold = targetFileSize / 2;
+        List<List<DataFileMeta>> mergeGroups = new ArrayList<>();
+        List<DataFileMeta> pending = new ArrayList<>();
+
+        for (List<DataFileMeta> section : sections) {
+            boolean needsMerge = section.size() >= 2;
+            boolean isSmall = sectionSize(section) < smallSectionThreshold;
+
+            if (needsMerge || isSmall) {
+                pending.addAll(section);
+            } else {
+                if (pending.size() >= 2) {
+                    mergeGroups.add(new ArrayList<>(pending));
+                }
+                pending.clear();
+            }
         }
 
+        if (pending.size() >= 2) {
+            mergeGroups.add(pending);
+        }
+
+        return mergeGroups;
+    }
+
+    /**
+     * Merge sorted files using min-heap based multi-way merge. Key index 
entries are deleted before
+     * reading and rebuilt after writing by the caller.
+     */
+    public List<DataFileMeta> mergeAndRewriteFiles(List<DataFileMeta> 
inputFiles) throws Exception {
+        InternalRowSerializer keyRowSerializer = new 
InternalRowSerializer(keyType);
+        InternalRowSerializer valueRowSerializer = new 
InternalRowSerializer(valueType);
+
         // Determine which files to spill to row-based temp files
         List<DataFileMeta> filesToSpill = new ArrayList<>();
-        List<DataFileMeta> filesToKeep = new ArrayList<>();
+        List<DataFileMeta> filesToKeep;
         if (inputFiles.size() > spillThreshold) {
             List<DataFileMeta> sortedBySize = new ArrayList<>(inputFiles);
             
sortedBySize.sort(Comparator.comparingLong(DataFileMeta::fileSize));
@@ -563,7 +247,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
                                         a.currentKeyValue.value(), 
b.currentKeyValue.value()));
 
         try {
-            // Add iterators for columnar files (kept in memory)
             for (DataFileMeta file : filesToKeep) {
                 @SuppressWarnings("resource")
                 CloseableIterator<KeyValue> iterator =
@@ -575,7 +258,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
                 }
             }
 
-            // Add iterators for row-based spilled files (low memory 
consumption)
             for (SpilledChannel spilled : spilledChannels) {
                 CloseableIterator<KeyValue> iterator = 
spilled.createIterator();
                 openIterators.add(iterator);
@@ -585,7 +267,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
                 }
             }
 
-            // Multi-way merge: write records in sorted order
             RollingFileWriter<KeyValue, DataFileMeta> writer =
                     writerFactory.createRollingClusteringFileWriter();
             try {
@@ -602,7 +283,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
                 writer.close();
             }
 
-            // Remove original files and register new sorted files
             List<DataFileMeta> newFiles = writer.result();
             for (DataFileMeta file : inputFiles) {
                 fileLevels.removeFile(file);
@@ -611,22 +291,6 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
                 fileLevels.addNewFile(newFile);
             }
 
-            // Rebuild key index for the new files
-            for (DataFileMeta newFile : newFiles) {
-                int fileId = fileLevels.getFileIdByName(newFile.fileName());
-                int position = 0;
-                try (CloseableIterator<InternalRow> keyIterator = 
readKeyIterator(newFile)) {
-                    while (keyIterator.hasNext()) {
-                        byte[] key = 
keySerializer.serializeToBytes(keyIterator.next());
-                        ByteArrayOutputStream value = new 
ByteArrayOutputStream(8);
-                        encodeInt(value, fileId);
-                        encodeInt(value, position);
-                        kvDb.put(key, value.toByteArray());
-                        position++;
-                    }
-                }
-            }
-
             return newFiles;
         } finally {
             for (CloseableIterator<KeyValue> iterator : openIterators) {
@@ -638,10 +302,41 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
         }
     }
 
-    /**
-     * Spill a columnar DataFileMeta to a row-based temp file. Row-based files 
consume much less
-     * memory when reading compared to columnar files.
-     */
+    private List<List<DataFileMeta>> groupIntoSections(List<DataFileMeta> 
files) {
+        List<DataFileMeta> sorted = new ArrayList<>(files);
+        sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(), 
b.minKey()));
+
+        List<List<DataFileMeta>> sections = new ArrayList<>();
+        List<DataFileMeta> currentSection = new ArrayList<>();
+        currentSection.add(sorted.get(0));
+        BinaryRow currentMax = sorted.get(0).maxKey();
+
+        for (int i = 1; i < sorted.size(); i++) {
+            DataFileMeta file = sorted.get(i);
+            if (clusteringComparatorAlone.compare(currentMax, file.minKey()) 
>= 0) {
+                currentSection.add(file);
+                if (clusteringComparatorAlone.compare(file.maxKey(), 
currentMax) > 0) {
+                    currentMax = file.maxKey();
+                }
+            } else {
+                sections.add(currentSection);
+                currentSection = new ArrayList<>();
+                currentSection.add(file);
+                currentMax = file.maxKey();
+            }
+        }
+        sections.add(currentSection);
+        return sections;
+    }
+
+    private long sectionSize(List<DataFileMeta> section) {
+        long total = 0;
+        for (DataFileMeta file : section) {
+            total += file.fileSize();
+        }
+        return total;
+    }
+
     private SpilledChannel spillToRowBasedFile(DataFileMeta file) throws 
Exception {
         FileIOChannel.ID channel = ioManager.createChannel();
         KeyValueWithLevelNoReusingSerializer serializer =
@@ -749,25 +444,7 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
         }
     }
 
-    /** Delete key index entries for the given file from kvDb (only if they 
still point to it). */
-    private void deleteKeyIndexForFile(RowCompactedSerializer keySerializer, 
DataFileMeta file)
-            throws Exception {
-        int fileId = fileLevels.getFileIdByName(file.fileName());
-        try (CloseableIterator<InternalRow> iterator = readKeyIterator(file)) {
-            while (iterator.hasNext()) {
-                byte[] key = keySerializer.serializeToBytes(iterator.next());
-                byte[] value = kvDb.get(key);
-                if (value != null) {
-                    int storedFileId = decodeInt(new 
ByteArrayInputStream(value));
-                    if (storedFileId == fileId) {
-                        kvDb.delete(key);
-                    }
-                }
-            }
-        }
-    }
-
-    /** Entry in the min-heap for multi-way merge, holding the current 
KeyValue and its iterator. */
+    /** Entry in the min-heap for multi-way merge. */
     private static class MergeEntry {
         KeyValue currentKeyValue;
         final CloseableIterator<KeyValue> iterator;
@@ -777,20 +454,4 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
             this.iterator = iterator;
         }
     }
-
-    @Override
-    public Optional<CompactResult> getCompactionResult(boolean blocking)
-            throws ExecutionException, InterruptedException {
-        return innerGetCompactionResult(blocking);
-    }
-
-    @Override
-    public boolean compactNotCompleted() {
-        return super.compactNotCompleted() || fileLevels.compactNotCompleted();
-    }
-
-    @Override
-    public void close() throws IOException {
-        kvDb.close();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
new file mode 100644
index 0000000000..d7234345f6
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.clustering;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
+import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
+
+/**
+ * Manages the primary key index for clustering compaction. Maps each primary 
key to its file
+ * location (fileId + row position) using a {@link SimpleLsmKvDb}.
+ */
+public class ClusteringKeyIndex implements Closeable {
+
+    private final RowType keyType;
+    private final IOManager ioManager;
+    private final KeyValueFileReaderFactory keyReaderFactory;
+    private final BucketedDvMaintainer dvMaintainer;
+    private final SimpleLsmKvDb kvDb;
+    private final ClusteringFiles fileLevels;
+    private final boolean firstRow;
+    private final long sortSpillBufferSize;
+    private final int pageSize;
+    private final int maxNumFileHandles;
+    private final CompressOptions compression;
+
+    public ClusteringKeyIndex(
+            RowType keyType,
+            IOManager ioManager,
+            KeyValueFileReaderFactory keyReaderFactory,
+            BucketedDvMaintainer dvMaintainer,
+            SimpleLsmKvDb kvDb,
+            ClusteringFiles fileLevels,
+            boolean firstRow,
+            long sortSpillBufferSize,
+            int pageSize,
+            int maxNumFileHandles,
+            CompressOptions compression) {
+        this.keyType = keyType;
+        this.ioManager = ioManager;
+        this.keyReaderFactory = keyReaderFactory;
+        this.dvMaintainer = dvMaintainer;
+        this.kvDb = kvDb;
+        this.fileLevels = fileLevels;
+        this.firstRow = firstRow;
+        this.sortSpillBufferSize = sortSpillBufferSize;
+        this.pageSize = pageSize;
+        this.maxNumFileHandles = maxNumFileHandles;
+        this.compression = compression;
+    }
+
+    /** Bootstrap the key index from existing sorted files using external sort 
+ bulk load. */
+    public void bootstrap(List<DataFileMeta> restoreFiles) {
+        List<DataField> combinedFields = new ArrayList<>();
+        List<DataField> keyFields = keyType.getFields();
+        for (int i = 0; i < keyFields.size(); i++) {
+            DataField kf = keyFields.get(i);
+            combinedFields.add(new DataField(i, kf.name(), kf.type()));
+        }
+        int valueFieldIndex = keyFields.size();
+        combinedFields.add(
+                new DataField(
+                        valueFieldIndex, "_value_bytes", new 
VarBinaryType(Integer.MAX_VALUE)));
+        RowType combinedType = new RowType(combinedFields);
+
+        int[] sortFields = IntStream.range(0, 
keyType.getFieldCount()).toArray();
+        BinaryExternalSortBuffer sortBuffer =
+                BinaryExternalSortBuffer.create(
+                        ioManager,
+                        combinedType,
+                        sortFields,
+                        sortSpillBufferSize,
+                        pageSize,
+                        maxNumFileHandles,
+                        compression,
+                        MemorySize.MAX_VALUE,
+                        false);
+
+        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
+        InternalRow.FieldGetter[] keyFieldGetters =
+                new InternalRow.FieldGetter[keyType.getFieldCount()];
+        for (int i = 0; i < keyType.getFieldCount(); i++) {
+            keyFieldGetters[i] = 
InternalRow.createFieldGetter(keyType.getTypeAt(i), i);
+        }
+        try {
+            for (DataFileMeta file : restoreFiles) {
+                if (file.level() == 0) {
+                    continue;
+                }
+                int fileId = fileLevels.getFileIdByName(file.fileName());
+                try (RecordReader<KeyValue> reader = 
keyReaderFactory.createRecordReader(file)) {
+                    FileRecordIterator<KeyValue> batch;
+                    while ((batch = (FileRecordIterator<KeyValue>) 
reader.readBatch()) != null) {
+                        KeyValue kv;
+                        while ((kv = batch.next()) != null) {
+                            int position = (int) batch.returnedPosition();
+                            ByteArrayOutputStream valueOut = new 
ByteArrayOutputStream(8);
+                            encodeInt(valueOut, fileId);
+                            encodeInt(valueOut, position);
+                            byte[] valueBytes = valueOut.toByteArray();
+
+                            GenericRow combinedRow = new 
GenericRow(combinedType.getFieldCount());
+                            for (int i = 0; i < keyType.getFieldCount(); i++) {
+                                combinedRow.setField(
+                                        i, 
keyFieldGetters[i].getFieldOrNull(kv.key()));
+                            }
+                            combinedRow.setField(valueFieldIndex, valueBytes);
+                            sortBuffer.write(combinedRow);
+                        }
+                        batch.releaseBatch();
+                    }
+                }
+            }
+
+            MutableObjectIterator<BinaryRow> sortedIterator = 
sortBuffer.sortedIterator();
+            BinaryRow binaryRow = new BinaryRow(combinedType.getFieldCount());
+            InternalRow.FieldGetter valueGetter =
+                    InternalRow.createFieldGetter(
+                            new VarBinaryType(Integer.MAX_VALUE), 
valueFieldIndex);
+
+            Iterator<Map.Entry<byte[], byte[]>> entryIterator =
+                    new Iterator<Map.Entry<byte[], byte[]>>() {
+                        private BinaryRow current = binaryRow;
+                        private boolean hasNext;
+
+                        {
+                            advance();
+                        }
+
+                        private void advance() {
+                            try {
+                                current = sortedIterator.next(current);
+                                hasNext = current != null;
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+
+                        @Override
+                        public boolean hasNext() {
+                            return hasNext;
+                        }
+
+                        @Override
+                        public Map.Entry<byte[], byte[]> next() {
+                            byte[] key = 
keySerializer.serializeToBytes(current);
+                            byte[] value = (byte[]) 
valueGetter.getFieldOrNull(current);
+                            advance();
+                            return new AbstractMap.SimpleImmutableEntry<>(key, 
value);
+                        }
+                    };
+
+            kvDb.bulkLoad(entryIterator);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            sortBuffer.clear();
+        }
+    }
+
+    /**
+     * Update the key index after a single original file is replaced by new 
sorted files.
+     *
+     * <p>For DEDUPLICATE mode: mark the old position in deletion vectors, 
keep the new position.
+     *
+     * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion 
vectors (keep the
+     * first/old one); if key is new, store the new position.
+     */
+    public void updateIndex(DataFileMeta originalFile, List<DataFileMeta> 
newSortedFiles)
+            throws Exception {
+        updateIndex(Collections.singletonList(originalFile), newSortedFiles);
+    }
+
+    /**
+     * Update the key index after multiple original files are replaced by new 
sorted files.
+     *
+     * @see #updateIndex(DataFileMeta, List)
+     */
+    public void updateIndex(List<DataFileMeta> originalFiles, 
List<DataFileMeta> newSortedFiles)
+            throws Exception {
+        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
+
+        Set<String> originalFileNames = new HashSet<>();
+        for (DataFileMeta file : originalFiles) {
+            originalFileNames.add(file.fileName());
+        }
+
+        for (DataFileMeta sortedFile : newSortedFiles) {
+            int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
+            int position = 0;
+            try (CloseableIterator<InternalRow> iterator = 
readKeyIterator(sortedFile)) {
+                while (iterator.hasNext()) {
+                    byte[] key = 
keySerializer.serializeToBytes(iterator.next());
+                    byte[] oldValue = kvDb.get(key);
+                    if (oldValue != null) {
+                        ByteArrayInputStream valueIn = new 
ByteArrayInputStream(oldValue);
+                        int oldFileId = decodeInt(valueIn);
+                        int oldPosition = decodeInt(valueIn);
+                        DataFileMeta oldFile = 
fileLevels.getFileById(oldFileId);
+                        if (oldFile != null && 
!originalFileNames.contains(oldFile.fileName())) {
+                            if (firstRow) {
+                                
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
+                                position++;
+                                continue;
+                            } else {
+                                
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
+                            }
+                        }
+                    }
+                    ByteArrayOutputStream value = new ByteArrayOutputStream(8);
+                    encodeInt(value, fileId);
+                    encodeInt(value, position);
+                    kvDb.put(key, value.toByteArray());
+                    position++;
+                }
+            }
+        }
+    }
+
+    /** Delete key index entries for the given file (only if they still point 
to it). */
+    public void deleteIndex(DataFileMeta file) throws Exception {
+        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
+        int fileId = fileLevels.getFileIdByName(file.fileName());
+        try (CloseableIterator<InternalRow> iterator = readKeyIterator(file)) {
+            while (iterator.hasNext()) {
+                byte[] key = keySerializer.serializeToBytes(iterator.next());
+                byte[] value = kvDb.get(key);
+                if (value != null) {
+                    int storedFileId = decodeInt(new 
ByteArrayInputStream(value));
+                    if (storedFileId == fileId) {
+                        kvDb.delete(key);
+                    }
+                }
+            }
+        }
+    }
+
+    /** Rebuild key index entries for a newly written file. */
+    public void rebuildIndex(DataFileMeta newFile) throws Exception {
+        RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
+        int fileId = fileLevels.getFileIdByName(newFile.fileName());
+        int position = 0;
+        try (CloseableIterator<InternalRow> keyIterator = 
readKeyIterator(newFile)) {
+            while (keyIterator.hasNext()) {
+                byte[] key = 
keySerializer.serializeToBytes(keyIterator.next());
+                ByteArrayOutputStream value = new ByteArrayOutputStream(8);
+                encodeInt(value, fileId);
+                encodeInt(value, position);
+                kvDb.put(key, value.toByteArray());
+                position++;
+            }
+        }
+    }
+
+    private CloseableIterator<InternalRow> readKeyIterator(DataFileMeta file) 
throws IOException {
+        //noinspection resource
+        return keyReaderFactory
+                .createRecordReader(file)
+                .transform(KeyValue::key)
+                .toCloseableIterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        kvDb.close();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index f69f5fc6ae..aaf661be2f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -336,6 +336,56 @@ class ClusteringTableTest {
                 .containsExactlyInAnyOrder(GenericRow.of(1, 50), 
GenericRow.of(2, 60));
     }
 
+    /**
+     * Test that bootstrap correctly rebuilds the key index via bulkLoad from 
existing sorted files.
+     *
+     * <p>Each writeRows() call creates a new writer (and thus a new 
ClusteringCompactManager),
+     * which calls {@code keyIndex.bootstrap(restoreFiles)}. The bootstrap 
method reads all level >
+     * 0 files, sorts them externally, and bulk-loads into the LSM KV DB — 
bypassing the normal
+     * put-per-entry path. This test verifies that the bulkLoad-based index is 
correct by checking
+     * deduplication across multiple commits with overlapping keys.
+     */
+    @Test
+    public void testBootstrapBulkLoadIndex() throws Exception {
+        // Commit 1: write initial data → compaction produces level > 0 sorted 
files
+        writeRows(
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 30),
+                        GenericRow.of(4, 40),
+                        GenericRow.of(5, 50)));
+
+        // Commit 2: new writer bootstraps index from level > 0 files via 
bulkLoad,
+        // then writes overlapping keys — updateIndex must find existing 
entries in the
+        // bulkLoaded index to generate correct deletion vectors
+        writeRows(
+                Arrays.asList(GenericRow.of(1, 100), GenericRow.of(3, 300), 
GenericRow.of(5, 500)));
+
+        // Verify dedup: keys 1,3,5 updated; keys 2,4 unchanged
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 100),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(3, 300),
+                        GenericRow.of(4, 40),
+                        GenericRow.of(5, 500));
+
+        // Commit 3: another bootstrap from the updated sorted files,
+        // verifies bulkLoad works correctly after files have been rewritten
+        writeRows(
+                Arrays.asList(GenericRow.of(2, 200), GenericRow.of(4, 400), 
GenericRow.of(6, 600)));
+
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 100),
+                        GenericRow.of(2, 200),
+                        GenericRow.of(3, 300),
+                        GenericRow.of(4, 400),
+                        GenericRow.of(5, 500),
+                        GenericRow.of(6, 600));
+    }
+
     // ==================== Clustering Column Filter Tests ====================
 
     /** Test that equality filter on clustering column skips irrelevant files 
in the scan plan. */


Reply via email to