This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f93192d446ac [HUDI-9558] Make merge handle configurable (#13495)
f93192d446ac is described below

commit f93192d446acebb24b7b9379ded3fadc3f3d1948
Author: Jon Vexler <[email protected]>
AuthorDate: Mon Jul 21 20:43:43 2025 -0400

    [HUDI-9558] Make merge handle configurable (#13495)
    
    * [ENG-21953][INTERNAL][1.0-internal] Add abstraction of HoodieMergeHandle 
to make its implementations configurable (#1192)
    
    ---------
    
    Co-authored-by: Lin Liu <[email protected]>
    Co-authored-by: Davis-Zhang-Onehouse 
<[email protected]>
    Co-authored-by: Davis Zhang <[email protected]>
    Co-authored-by: Rajesh Mahindra 
<[email protected]>
    Co-authored-by: rmahindra123 <[email protected]>
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: danny0405 <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  65 +++
 .../hudi/io/FileGroupReaderBasedMergeHandle.java   |   5 +-
 .../apache/hudi/io/HoodieAbstractMergeHandle.java  | 183 +++++++
 .../org/apache/hudi/io/HoodieConcatHandle.java     |  13 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 584 ++-------------------
 .../apache/hudi/io/HoodieMergeHandleFactory.java   | 167 +++++-
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |   2 +-
 .../apache/hudi/io/HoodieSortedMergeHandle.java    |   9 +-
 .../io/HoodieSortedMergeHandleWithChangeLog.java   |   4 +-
 ...ergeHandle.java => HoodieWriteMergeHandle.java} | 176 ++-----
 .../src/main/java/org/apache/hudi/io/IOUtils.java  |  45 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  10 -
 .../hudi/table/action/commit/BaseMergeHelper.java  |  16 +-
 .../table/action/commit/HoodieMergeHelper.java     |   4 +-
 .../hudi/table/action/compact/HoodieCompactor.java |   7 +-
 .../hudi/io/TestHoodieMergeHandleFactory.java      | 156 ++++++
 .../apache/hudi/io/FlinkMergeAndReplaceHandle.java |   7 +-
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  |   7 +-
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |   2 +
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  13 +-
 .../commit/BaseFlinkCommitActionExecutor.java      |  15 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  11 +-
 .../commit/BaseJavaCommitActionExecutor.java       |  13 +-
 .../TestHoodieJavaClientOnCopyOnWriteStorage.java  |   8 +-
 .../client/common/SparkReaderContextFactory.java   |   2 +
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  37 +-
 .../org/apache/hudi/table/HoodieSparkTable.java    |  23 -
 .../commit/BaseSparkCommitActionExecutor.java      |  28 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     |   4 +-
 ...ndle.java => TestHoodieDefaultMergeHandle.java} |   2 +-
 .../table/log/BaseHoodieLogRecordReader.java       |   6 +-
 .../table/log/HoodieMergedLogRecordReader.java     |   8 +-
 .../hudi/command/payload/ExpressionPayload.scala   |   2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   8 +-
 .../java/org/apache/hudi/io/TestMergeHandle.java   |   2 +-
 35 files changed, 789 insertions(+), 855 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2ce654d6e644..a1df49505c48 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -71,6 +71,9 @@ import org.apache.hudi.estimator.AverageRecordSizeEstimator;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
+import org.apache.hudi.io.HoodieConcatHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -850,6 +853,37 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("1.1.0")
       .withDocumentation("Records event time watermark metadata in commit 
metadata when enabled");
 
+  public static final ConfigProperty<String> MERGE_HANDLE_CLASS_NAME = 
ConfigProperty
+      .key("hoodie.write.merge.handle.class")
+      .defaultValue(HoodieWriteMergeHandle.class.getName())
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("The merge handle class that implements 
interface{@link HoodieMergeHandle} to merge the records "
+          + "from a base file with an iterator of incoming records or a map of 
updates and deletes from log files at a file group level.");
+
+  public static final ConfigProperty<String> CONCAT_HANDLE_CLASS_NAME = 
ConfigProperty
+      .key("hoodie.write.concat.handle.class")
+      .defaultValue(HoodieConcatHandle.class.getName())
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("The merge handle class to use to concat the records 
from a base file with an iterator of incoming records.");
+
+  public static final ConfigProperty<String> COMPACT_MERGE_HANDLE_CLASS_NAME = 
ConfigProperty
+      .key("hoodie.compact.merge.handle.class")
+      .defaultValue(FileGroupReaderBasedMergeHandle.class.getName())
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Merge handle class for compaction");
+
+  public static final ConfigProperty<Boolean> MERGE_HANDLE_PERFORM_FALLBACK = 
ConfigProperty
+      .key("hoodie.write.merge.handle.fallback")
+      .defaultValue(true)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("When using a custom Hoodie Merge Handle 
Implementation controlled by the config " + MERGE_HANDLE_CLASS_NAME.key()
+          + " or when using a custom Hoodie Concat Handle Implementation 
controlled by the config " + CONCAT_HANDLE_CLASS_NAME.key()
+              + ", enabling this config results in fallback to the default 
implementations if instantiation of the custom implementation fails");
+
   /**
    * Config key with boolean value that indicates whether record being written 
during MERGE INTO Spark SQL
    * operation are already prepped.
@@ -1401,6 +1435,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
   }
 
+  public boolean isMergeHandleFallbackEnabled() {
+    return 
getBooleanOrDefault(HoodieWriteConfig.MERGE_HANDLE_PERFORM_FALLBACK);
+  }
+
   public boolean isConsistentLogicalTimestampEnabled() {
     return 
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
   }
@@ -1497,6 +1535,18 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(WRITE_STATUS_CLASS_NAME);
   }
 
+  public String getMergeHandleClassName() {
+    return getStringOrDefault(MERGE_HANDLE_CLASS_NAME);
+  }
+
+  public String getConcatHandleClassName() {
+    return getStringOrDefault(CONCAT_HANDLE_CLASS_NAME);
+  }
+
+  public String getCompactionMergeHandleClassName() {
+    return getStringOrDefault(COMPACT_MERGE_HANDLE_CLASS_NAME);
+  }
+
   public int getFinalizeWriteParallelism() {
     return getInt(FINALIZE_WRITE_PARALLELISM_VALUE);
   }
@@ -3399,6 +3449,21 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withMergeHandleClassName(String className) {
+      writeConfig.setValue(MERGE_HANDLE_CLASS_NAME, className);
+      return this;
+    }
+
+    public Builder withConcatHandleClassName(String className) {
+      writeConfig.setValue(CONCAT_HANDLE_CLASS_NAME, className);
+      return this;
+    }
+
+    public Builder withFileGroupReaderMergeHandleClassName(String className) {
+      writeConfig.setValue(COMPACT_MERGE_HANDLE_CLASS_NAME, className);
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index 0e7223a06f32..9ce794347b79 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -72,7 +72,7 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.HFILE;
  * the records, and writes the records to a new base file.
  */
 @NotThreadSafe
-public class FileGroupReaderBasedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I, K, O> {
+public class FileGroupReaderBasedMergeHandle<T, I, K, O> extends 
HoodieWriteMergeHandle<T, I, K, O> {
   private static final Logger LOG = 
LoggerFactory.getLogger(FileGroupReaderBasedMergeHandle.class);
 
   private final HoodieReaderContext<T> readerContext;
@@ -169,7 +169,8 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O> 
extends HoodieMergeHand
    * Reads the file slice of a compaction operation using a file group reader,
    * by getting an iterator of the records; then writes the records to a new 
base file.
    */
-  public void write() {
+  @Override
+  public void doMerge() {
     boolean usePosition = 
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
     Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema());
     TypedProperties props = TypedProperties.copy(config.getProps());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAbstractMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAbstractMergeHandle.java
new file mode 100644
index 000000000000..9ff103801042
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAbstractMergeHandle.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstract class to implement merging records from base file with incoming 
records or records from log blocks
+ * at a file group level.
+ */
+public abstract class HoodieAbstractMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O> implements HoodieMergeHandle<T, I, K, O> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieAbstractMergeHandle.class);
+
+  protected Map<String, HoodieRecord<T>> keyToNewRecords;
+  protected StoragePath newFilePath;
+  protected StoragePath oldFilePath;
+  protected Option<BaseKeyGenerator> keyGeneratorOpt;
+  protected HoodieBaseFile baseFileToMerge;
+  protected Option<String[]> partitionFields = Option.empty();
+  protected Object[] partitionValues = new Object[0];
+
+  /**
+   * Used by writer code path, to upsert new records by providing an iterator 
to the new records.
+   * @param config Hoodie writer configs.
+   * @param instantTime current instant time.
+   * @param hoodieTable an instance of {@link HoodieTable}
+   * @param partitionPath Partition path of the upsert and insert records.
+   * @param fileId New file id of the target base file.
+   * @param taskContextSupplier Base task context supplier
+   * @param baseFile current base file that needs to be read for the records 
in storage.
+   * @param keyGeneratorOpt Optional instance of the {@link 
org.apache.hudi.keygen.KeyGenerator} used.
+   */
+  public HoodieAbstractMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                   String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier,
+                                   HoodieBaseFile baseFile, 
Option<BaseKeyGenerator> keyGeneratorOpt, boolean preserveMetadata) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, preserveMetadata);
+    this.baseFileToMerge = baseFile;
+    this.keyGeneratorOpt = keyGeneratorOpt;
+    initPartitionMetadataAndFilePaths(fileId, partitionPath);
+    initWriteStatus(fileId, partitionPath);
+    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+  }
+
+  /**
+   * Used by fg reader merge handle
+   */
+  protected HoodieAbstractMergeHandle(HoodieWriteConfig config, String 
instantTime, String partitionPath,
+                                      String fileId, HoodieTable<T, I, K, O> 
hoodieTable, TaskContextSupplier taskContextSupplier, boolean preserveMetadata) 
{
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, preserveMetadata);
+  }
+
+  @Override
+  public StoragePath getOldFilePath() {
+    return oldFilePath;
+  }
+
+  @Override
+  public IOType getIOType() {
+    return IOType.MERGE;
+  }
+
+  @Override
+  public HoodieBaseFile baseFileForMerge() {
+    return baseFileToMerge;
+  }
+
+  public void setPartitionFields(Option<String[]> partitionFields) {
+    this.partitionFields = partitionFields;
+  }
+
+  public Option<String[]> getPartitionFields() {
+    return this.partitionFields;
+  }
+
+  public void setPartitionValues(Object[] partitionValues) {
+    this.partitionValues = partitionValues;
+  }
+
+  public Object[] getPartitionValues() {
+    return this.partitionValues;
+  }
+
+  /**
+   * Extract old file path, initialize StorageWriter and WriteStatus.
+   */
+  private void initPartitionMetadataAndFilePaths(String targetFileId, String 
partitionPath) {
+    LOG.info("partitionPath:{}, targetFileId to be merged: {}", partitionPath, 
targetFileId);
+    String latestValidFilePath = baseFileToMerge == null ? null : 
baseFileToMerge.getFileName();
+    HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(
+        storage,
+        instantTime,
+        new StoragePath(config.getBasePath()),
+        FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
+        hoodieTable.getPartitionMetafileFormat());
+    partitionMetadata.trySave();
+
+    String newFileName = FSUtils.makeBaseFileName(
+        instantTime, writeToken, targetFileId, 
hoodieTable.getBaseFileExtension());
+    makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
+
+    LOG.info(
+        "Merging new data into oldPath: {}, as newPath: {}",
+        oldFilePath.toString(), newFilePath.toString());
+
+  }
+
+  private void initWriteStatus(String fileId, String partitionPath) {
+    writeStatus.setStat(new HoodieWriteStat());
+    if (baseFileToMerge != null) {
+      writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+      // At the moment, we only support SI for overwrite with latest payload. 
So, we don't need to embed entire file slice here.
+      // HUDI-8518 will be taken up to fix it for any payload during which we 
might require entire file slice to be set here.
+      // Already AppendHandle adds all logs file from current file slice to 
HoodieDeltaWriteStat.
+      writeStatus.getStat().setPrevBaseFile(baseFileToMerge.getFileName());
+    } else {
+      writeStatus.getStat().setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+    }
+    // file name is same for all records, in this bunch
+    writeStatus.setFileId(fileId);
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.getStat().setPartitionPath(partitionPath);
+    writeStatus.getStat().setFileId(fileId);
+    LOG.debug("Initializing Write status with fileId {} partitionPath {}", 
fileId, partitionPath);
+    setWriteStatusPath();
+  }
+
+  private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> 
keyGeneratorOpt, boolean populateMetaFields) {
+    ValidationUtils.checkArgument(populateMetaFields == 
!keyGeneratorOpt.isPresent());
+    this.keyGeneratorOpt = keyGeneratorOpt;
+  }
+
+  protected void makeOldAndNewFilePaths(String partitionPath, String 
oldFileName, String newFileName) {
+    oldFilePath = makeNewFilePath(partitionPath, oldFileName);
+    newFilePath = makeNewFilePath(partitionPath, newFileName);
+  }
+
+  public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> 
hoodieTable, String partitionPath, String fileId) {
+    Option<HoodieBaseFile> baseFileOp = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+    if (!baseFileOp.isPresent()) {
+      throw new NoSuchElementException(String.format("FileID %s of partition 
path %s does not exist.", fileId, partitionPath));
+    }
+    return baseFileOp.get();
+  }
+
+  protected void setWriteStatusPath() {
+    writeStatus.getStat().setPath(new StoragePath(config.getBasePath()), 
newFilePath);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
index c8b93c2ac58f..4b3ba020b3cb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -37,10 +38,11 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Handle to concatenate new records to old records w/o any merging. If 
Operation is set to Inserts, and if {{@link 
HoodieWriteConfig#allowDuplicateInserts()}}
- * is set, this handle will be used instead of {@link HoodieMergeHandle}.
+ * is set, this handle will be used instead of {@link HoodieWriteMergeHandle}.
  *
  * Simplified Logic:
  * For every existing record
@@ -65,7 +67,7 @@ import java.util.Iterator;
  * happen and every batch should have new records to be inserted. Above 
example is for illustration purposes only.
  */
 @NotThreadSafe
-public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K, 
O> {
+public class HoodieConcatHandle<T, I, K, O> extends HoodieWriteMergeHandle<T, 
I, K, O> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieConcatHandle.class);
   // a representation of incoming records that tolerates duplicate keys
@@ -78,6 +80,13 @@ public class HoodieConcatHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I, K, O
     this.recordItr = recordItr;
   }
 
+  public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable,
+                            Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
+                            HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, Collections.emptyMap(), 
partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, Option.empty());
+    this.recordItr = keyToNewRecords.values().iterator();
+  }
+
   /**
    * Write old record as is w/o merging with incoming record.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 7b81d728a358..abf02d3bd44e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -7,584 +7,62 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.serialization.DefaultSerializer;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieCorruptedDataException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileWriter;
-import org.apache.hudi.io.storage.HoodieFileWriterFactory;
-import org.apache.hudi.io.storage.HoodieIOFactory;
-import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Handle to merge incoming records to those in storage.
- * <p>
- * Simplified Logic:
- * For every existing record
- *     Check if there is a new record coming in. If yes, merge two records and 
write to file
- *     else write the record as is
- * For all pending records from incoming batch, write to file.
- *
- * <p>
- * Illustration with simple data.
- * Incoming data:
- *     rec1_2, rec4_2, rec5_1, rec6_1
- * Existing data:
- *     rec1_1, rec2_1, rec3_1, rec4_1
- * <p>
- * For every existing record, merge w/ incoming if required and write to 
storage.
- *    => rec1_1 and rec1_2 is merged to write rec1_2 to storage
- *    => rec2_1 is written as is
- *    => rec3_1 is written as is
- *    => rec4_2 and rec4_1 is merged to write rec4_2 to storage
- * Write all pending records from incoming set to storage
- *    => rec5_1 and rec6_1
- * <p>
- * Final snapshot in storage
- * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
- */
-@SuppressWarnings("Duplicates")
-@NotThreadSafe
-public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, 
O> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeHandle.class);
-
-  protected Map<String, HoodieRecord<T>> keyToNewRecords;
-  protected Set<String> writtenRecordKeys;
-  protected HoodieFileWriter fileWriter;
-
-  protected StoragePath newFilePath;
-  protected StoragePath oldFilePath;
-  protected long recordsWritten = 0;
-  protected long recordsDeleted = 0;
-  protected long updatedRecordsWritten = 0;
-  protected long insertRecordsWritten = 0;
-  protected Option<BaseKeyGenerator> keyGeneratorOpt;
-  protected HoodieBaseFile baseFileToMerge;
-
-  protected Option<String[]> partitionFields = Option.empty();
-  protected Object[] partitionValues = new Object[0];
-
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
-    this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier,
-        getLatestBaseFile(hoodieTable, partitionPath, fileId), 
keyGeneratorOpt);
-  }
-
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, false);
-    init(recordItr);
-    init(fileId, partitionPath, baseFile);
-    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
-  }
-
-  /**
-   * Called by compactor code path.
-   */
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
-                           HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
-    this.keyToNewRecords = keyToNewRecords;
-    init(fileId, this.partitionPath, dataFileToBeMerged);
-    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
-  }
-
-  /**
-   * Used by `HoodieSparkFileGroupReaderBasedMergeHandle`.
-   *
-   * @param config              Hudi write config
-   * @param instantTime         Instant time to use
-   * @param partitionPath       Partition path
-   * @param fileId              File group ID for the merge handle to operate 
on
-   * @param hoodieTable         {@link HoodieTable} instance
-   * @param taskContextSupplier Task context supplier
-   */
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
-                           String fileId, HoodieTable<T, I, K, O> hoodieTable, 
TaskContextSupplier taskContextSupplier) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
-  }
-
-  private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> 
keyGeneratorOpt, boolean populateMetaFields) {
-    ValidationUtils.checkArgument(populateMetaFields == 
!keyGeneratorOpt.isPresent());
-    this.keyGeneratorOpt = keyGeneratorOpt;
-  }
-
-  public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> 
hoodieTable, String partitionPath, String fileId) {
-    Option<HoodieBaseFile> baseFileOp = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
-    if (!baseFileOp.isPresent()) {
-      throw new NoSuchElementException(String.format("FileID %s of partition 
path %s does not exist.", fileId, partitionPath));
-    }
-    return baseFileOp.get();
-  }
-
-  /**
-   * Extract old file path, initialize StorageWriter and WriteStatus.
-   */
-  private void init(String fileId, String partitionPath, HoodieBaseFile 
baseFileToMerge) {
-    LOG.info("partitionPath:{}, fileId to be merged:{}", partitionPath, 
fileId);
-    this.baseFileToMerge = baseFileToMerge;
-    this.writtenRecordKeys = new HashSet<>();
-    writeStatus.setStat(new HoodieWriteStat());
-    try {
-      String latestValidFilePath = baseFileToMerge.getFileName();
-      writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
-      // At the moment, we only support SI for overwrite with latest payload. 
So, we don't need to embed entire file slice here.
-      // HUDI-8518 will be taken up to fix it for any payload during which we 
might require entire file slice to be set here.
-      // Already AppendHandle adds all logs file from current file slice to 
HoodieDeltaWriteStat.
-      writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
-
-      HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(storage, instantTime,
-          new StoragePath(config.getBasePath()),
-          FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
-          hoodieTable.getPartitionMetafileFormat());
-      partitionMetadata.trySave();
-
-      String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken, 
fileId, hoodieTable.getBaseFileExtension());
-      makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
-
-      LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath, 
newFilePath);
-      // file name is same for all records, in this bunch
-      writeStatus.setFileId(fileId);
-      writeStatus.setPartitionPath(partitionPath);
-      writeStatus.getStat().setPartitionPath(partitionPath);
-      writeStatus.getStat().setFileId(fileId);
-      setWriteStatusPath();
-
-      // Create Marker file,
-      // uses name of `newFilePath` instead of `newFileName`
-      // in case the sub-class may roll over the file handle name.
-      createMarkerFile(partitionPath, newFilePath.getName());
-
-      // Create the writer for writing the new version file
-      fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, 
newFilePath, hoodieTable.getStorage(),
-          config, writeSchemaWithMetaFields, taskContextSupplier, 
recordMerger.getRecordType());
-    } catch (IOException io) {
-      LOG.error("Error in update task at commit {}", instantTime, io);
-      writeStatus.setGlobalError(io);
-      throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle 
for FileId: " + fileId + " on commit "
-          + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
-    }
-  }
-
-  protected void setWriteStatusPath() {
-    writeStatus.getStat().setPath(new StoragePath(config.getBasePath()), 
newFilePath);
-  }
-
-  protected void makeOldAndNewFilePaths(String partitionPath, String 
oldFileName, String newFileName) {
-    oldFilePath = makeNewFilePath(partitionPath, oldFileName);
-    newFilePath = makeNewFilePath(partitionPath, newFileName);
-  }
-
-  /**
-   * Initialize a spillable map for incoming records.
-   */
-  protected void initializeIncomingRecordsMap() {
-    try {
-      // Load the new records in a map
-      long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
-      LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge);
-      this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
-          new DefaultSizeEstimator<>(), new 
HoodieRecordSizeEstimator<>(writeSchema),
-          config.getCommonConfig().getSpillableDiskMapType(),
-          new DefaultSerializer<>(),
-          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled(),
-          getClass().getSimpleName());
-    } catch (IOException io) {
-      throw new HoodieIOException("Cannot instantiate an 
ExternalSpillableMap", io);
-    }
-  }
 
-  /**
-   * Whether there is need to update the record location.
-   */
-  boolean needsUpdateLocation() {
-    return true;
-  }
-
-  /**
-   * Load the new incoming records in a map and return partitionPath.
-   */
-  protected void init(Iterator<HoodieRecord<T>> newRecordsItr) {
-    initializeIncomingRecordsMap();
-    while (newRecordsItr.hasNext()) {
-      HoodieRecord<T> record = newRecordsItr.next();
-      // update the new location of the record, so we know where to find it 
next
-      if (needsUpdateLocation()) {
-        record.unseal();
-        record.setNewLocation(newRecordLocation);
-        record.seal();
-      }
-      // NOTE: Once Records are added to map (spillable-map), DO NOT change it 
as they won't persist
-      keyToNewRecords.put(record.getRecordKey(), record);
-    }
-    if (keyToNewRecords instanceof ExternalSpillableMap) {
-      ExternalSpillableMap<String, HoodieRecord<T>> spillableMap = 
(ExternalSpillableMap<String, HoodieRecord<T>>) keyToNewRecords;
-      LOG.info("Number of entries in MemoryBasedMap => {}, Total size in bytes 
of MemoryBasedMap => {}, "
-          + "Number of entries in BitCaskDiskMap => {}, Size of file spilled 
to disk => {}",
-          spillableMap.getInMemoryMapNumEntries(), 
spillableMap.getCurrentInMemoryMapSize(), 
spillableMap.getDiskBasedMapNumEntries(), 
spillableMap.getSizeOfFileOnDiskInBytes());
-    }
-  }
-
-  public boolean isEmptyNewRecords() {
-    return keyToNewRecords.isEmpty();
-  }
-
-  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema 
writerSchema) throws IOException {
-    boolean isDelete = false;
-    if (combineRecordOpt.isPresent()) {
-      if (oldRecord.getData() != combineRecordOpt.get().getData()) {
-        // the incoming record is chosen
-        isDelete = HoodieOperation.isDelete(newRecord.getOperation());
-      } else {
-        // the incoming record is dropped
-        return false;
-      }
-      updatedRecordsWritten++;
-    }
-    return writeRecord(newRecord, oldRecord, combineRecordOpt, writerSchema, 
config.getPayloadConfig().getProps(), isDelete);
-  }
-
-  protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = getNewSchema();
-    // just skip the ignored record
-    if (newRecord.shouldIgnore(schema, config.getProps())) {
-      return;
-    }
-    writeInsertRecord(newRecord, schema, config.getProps());
-  }
-
-  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, 
Properties prop) {
-    if (writeRecord(newRecord, null, Option.of(newRecord), schema, prop, 
HoodieOperation.isDelete(newRecord.getOperation()))) {
-      insertRecordsWritten++;
-    }
-  }
-
-  protected boolean writeRecord(HoodieRecord<T> newRecord, 
Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws 
IOException {
-    return writeRecord(newRecord, null, combineRecord, schema, prop, false);
-  }
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieMergeHandle<T, I, K, O> {
 
   /**
-   * The function takes the different versions of the record - old record, new 
incoming record and combined record
-   * created by merging the old record with the new incoming record. It 
decides whether the combined record needs to be
-   * written to the file and writes the record accordingly.
-   *
-   * @param newRecord     The new incoming record
-   * @param oldRecord     The value of old record
-   * @param combineRecord Record created by merging the old record with the 
new incoming record
-   * @param schema        Record schema
-   * @param prop          Properties
-   * @param isDelete      Whether the new record is a delete record
-   *
-   * @return true if the record was written successfully
+   * Called to read the base file, the incoming records, merge the records and 
write the final base file.
    * @throws IOException
    */
-  private boolean writeRecord(HoodieRecord<T> newRecord,
-                              @Nullable HoodieRecord<T> oldRecord,
-                              Option<HoodieRecord> combineRecord,
-                              Schema schema,
-                              Properties prop,
-                              boolean isDelete) {
-    Option recordMetadata = newRecord.getMetadata();
-    if (!partitionPath.equals(newRecord.getPartitionPath())) {
-      HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
partition path, record partition: "
-          + newRecord.getPartitionPath() + " but trying to insert into 
partition: " + partitionPath);
-      writeStatus.markFailure(newRecord, failureEx, recordMetadata);
-      return false;
-    }
-    try {
-      if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, 
config.getProps()) && !isDelete) {
-        // Last-minute check.
-        boolean decision = recordMerger.shouldFlush(combineRecord.get(), 
schema, config.getProps());
-
-        if (decision) {
-          // CASE (1): Flush the merged record.
-          HoodieKey hoodieKey = newRecord.getKey();
-          if (isSecondaryIndexStatsStreamingWritesEnabled) {
-            SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, 
combineRecord, oldRecord, false, writeStatus,
-                writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
-          }
-          writeToFile(hoodieKey, combineRecord.get(), schema, prop, 
preserveMetadata);
-          recordsWritten++;
-        } else {
-          // CASE (2): A delete operation.
-          if (isSecondaryIndexStatsStreamingWritesEnabled) {
-            
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(), 
combineRecord, oldRecord, true, writeStatus,
-                writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
-          }
-          recordsDeleted++;
-        }
-      } else {
-        if (isSecondaryIndexStatsStreamingWritesEnabled) {
-          
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(), 
combineRecord, oldRecord, true, writeStatus,
-              writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
-        }
-        recordsDeleted++;
-        // Clear the new location as the record was deleted
-        newRecord.unseal();
-        newRecord.clearNewLocation();
-        newRecord.seal();
-      }
-      writeStatus.markSuccess(newRecord, recordMetadata);
-      // deflate record payload after recording success. This will help users 
access payload as a
-      // part of marking
-      // record successful.
-      newRecord.deflate();
-      return true;
-    } catch (Exception e) {
-      LOG.error("Error writing record {}", newRecord, e);
-      writeStatus.markFailure(newRecord, e, recordMetadata);
-    }
-    return false;
-  }
-
-  /**
-   * Go through an old record. Here if we detect a newer version shows up, we 
write the new one to the file.
-   */
-  public void write(HoodieRecord<T> oldRecord) {
-    // Use schema with metadata files no matter whether 
'hoodie.populate.meta.fields' is enabled
-    // to avoid unnecessary rewrite. Even with metadata table(whereas the 
option 'hoodie.populate.meta.fields' is configured as false),
-    // the record is deserialized with schema including metadata fields,
-    // see HoodieMergeHelper#runMerge for more details.
-    Schema oldSchema = writeSchemaWithMetaFields;
-    Schema newSchema = getNewSchema();
-    boolean copyOldRecord = true;
-    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
-    TypedProperties props = config.getPayloadConfig().getProps();
-    if (keyToNewRecords.containsKey(key)) {
-      // If we have duplicate records that we are updating, then the hoodie 
record will be deflated after
-      // writing the first record. So make a copy of the record to be merged
-      HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
-      try {
-        Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
-        Schema combineRecordSchema = 
mergeResult.map(Pair::getRight).orElse(null);
-        Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
-        if (combinedRecord.isPresent() && 
combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
-          // If it is an IGNORE_RECORD, just copy the old record, and do not 
update the new record.
-          copyOldRecord = true;
-        } else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord, 
combineRecordSchema)) {
-          /*
-           * ONLY WHEN 1) we have an update for this key AND 2) We are able to 
successfully
-           * write the combined new value
-           *
-           * We no longer need to copy the old record over.
-           */
-          copyOldRecord = false;
-        }
-        writtenRecordKeys.add(key);
-      } catch (Exception e) {
-        throw new HoodieUpsertException("Failed to combine/merge new record 
with old value in storage, for new record {"
-            + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", 
e);
-      }
-    }
-
-    if (copyOldRecord) {
-      try {
-        // NOTE: We're enforcing preservation of the record metadata to keep 
existing semantic
-        writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, 
props, true);
-      } catch (IOException | RuntimeException e) {
-        String errMsg = String.format("Failed to merge old record into new 
file for key %s from old file %s to new file %s with writerSchema %s",
-            key, getOldFilePath(), newFilePath, 
writeSchemaWithMetaFields.toString(true));
-        LOG.debug("Old record is {}", oldRecord);
-        throw new HoodieUpsertException(errMsg, e);
-      }
-      recordsWritten++;
-    }
-  }
-
-  protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema 
schema, Properties prop, boolean shouldPreserveRecordMetadata) throws 
IOException {
-    if (shouldPreserveRecordMetadata) {
-      // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly 
point to the
-      //       file holding this record even in cases when overall metadata is 
preserved
-      HoodieRecord populatedRecord = record.updateMetaField(schema, 
HoodieRecord.FILENAME_META_FIELD_ORD, newFilePath.getName());
-      fileWriter.write(key.getRecordKey(), populatedRecord, 
writeSchemaWithMetaFields);
-    } else {
-      // rewrite the record to include metadata fields in schema, and the 
values will be set later.
-      record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new 
MetadataValues(), config.getProps());
-      fileWriter.writeWithMetadata(key, record, writeSchemaWithMetaFields);
-    }
-  }
-
-  protected void writeIncomingRecords() throws IOException {
-    // write out any pending records (this can happen when inserts are turned 
into updates)
-    Iterator<HoodieRecord<T>> newRecordsItr;
-    if (keyToNewRecords instanceof ExternalSpillableMap) {
-      newRecordsItr = ((ExternalSpillableMap) keyToNewRecords).iterator(key -> 
!writtenRecordKeys.contains(key));
-    } else {
-      newRecordsItr = keyToNewRecords.entrySet().stream()
-          .filter(e -> !writtenRecordKeys.contains(e.getKey()))
-          .map(Map.Entry::getValue)
-          .iterator();
-    }
-    while (newRecordsItr.hasNext()) {
-      HoodieRecord<T> hoodieRecord = newRecordsItr.next();
-      writeInsertRecord(hoodieRecord);
-    }
-  }
-
-  private Schema getNewSchema() {
-    return preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
-  }
-
-  @Override
-  public List<WriteStatus> close() {
-    try {
-      if (isClosed()) {
-        // Handle has already been closed
-        return Collections.emptyList();
-      }
-
-      markClosed();
-      writeIncomingRecords();
-
-      if (keyToNewRecords instanceof Closeable) {
-        ((Closeable) keyToNewRecords).close();
-      }
-
-      keyToNewRecords = null;
-      writtenRecordKeys = null;
-
-      fileWriter.close();
-      fileWriter = null;
-
-      long fileSizeInBytes = storage.getPathInfo(newFilePath).getLength();
-      HoodieWriteStat stat = writeStatus.getStat();
-
-      stat.setTotalWriteBytes(fileSizeInBytes);
-      stat.setFileSizeInBytes(fileSizeInBytes);
-      stat.setNumWrites(recordsWritten);
-      stat.setNumDeletes(recordsDeleted);
-      stat.setNumUpdateWrites(updatedRecordsWritten);
-      stat.setNumInserts(insertRecordsWritten);
-      stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
-      RuntimeStats runtimeStats = new RuntimeStats();
-      runtimeStats.setTotalUpsertTime(timer.endTimer());
-      stat.setRuntimeStats(runtimeStats);
-
-      performMergeDataValidationCheck(writeStatus);
-
-      LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.", 
stat.getPartitionPath(),
-          stat.getFileId(), runtimeStats.getTotalUpsertTime());
-
-      return Collections.singletonList(writeStatus);
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to close UpdateHandle", e);
-    }
-  }
-
-  public void performMergeDataValidationCheck(WriteStatus writeStatus) {
-    if (!config.isMergeDataValidationCheckEnabled() || baseFileToMerge == 
null) {
-      return;
-    }
-
-    long oldNumWrites = 0;
-    try (HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
-        .getReaderFactory(this.recordMerger.getRecordType())
-        .getFileReader(config, oldFilePath)) {
-      oldNumWrites = reader.getTotalRecords();
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to check for merge data 
validation", e);
-    }
-
-    if ((writeStatus.getStat().getNumWrites() + 
writeStatus.getStat().getNumDeletes()) < oldNumWrites) {
-      throw new HoodieCorruptedDataException(
-          String.format("Record write count decreased for file: %s, Partition 
Path: %s (%s:%d + %d < %s:%d)",
-              writeStatus.getFileId(), writeStatus.getPartitionPath(),
-              instantTime, writeStatus.getStat().getNumWrites(), 
writeStatus.getStat().getNumDeletes(),
-              baseFileToMerge.getCommitTime(), oldNumWrites));
-    }
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  void doMerge() throws IOException;
 
-  public Iterator<List<WriteStatus>> getWriteStatusesAsIterator() {
-    List<WriteStatus> statuses = getWriteStatuses();
-    // TODO(vc): This needs to be revisited
-    if (getPartitionPath() == null) {
-      LOG.info("Upsert Handle has partition path as null {}, {}", 
getOldFilePath(), statuses);
-    }
-    return Collections.singletonList(statuses).iterator();
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  HoodieBaseFile baseFileForMerge();
 
-  public StoragePath getOldFilePath() {
-    return oldFilePath;
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  void setPartitionFields(Option<String[]> partitionFields);
 
-  @Override
-  public IOType getIOType() {
-    return IOType.MERGE;
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  StoragePath getOldFilePath();
 
-  public HoodieBaseFile baseFileForMerge() {
-    return baseFileToMerge;
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  String getPartitionPath();
 
-  public void setPartitionFields(Option<String[]> partitionFields) {
-    this.partitionFields = partitionFields;
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Schema getWriterSchema();
 
-  public Option<String[]> getPartitionFields() {
-    return this.partitionFields;
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  void setPartitionValues(Object[] partitionValues);
 
-  public void setPartitionValues(Object[] partitionValues) {
-    this.partitionValues = partitionValues;
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  List<WriteStatus> getWriteStatuses();
 
-  public Object[] getPartitionValues() {
-    return this.partitionValues;
-  }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  List<WriteStatus> close();
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index d049f884b514..9b8ed2811b0a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -18,12 +18,19 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
@@ -32,12 +39,16 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.hudi.config.HoodieWriteConfig.COMPACT_MERGE_HANDLE_CLASS_NAME;
 
 /**
- * Factory class for hoodie merge handle.
+ * Factory class for instantiating the appropriate implementation of {@link 
HoodieMergeHandle}.
  */
 public class HoodieMergeHandleFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeHandleFactory.class);
+
   /**
    * Creates a merge handle for normal write path.
    */
@@ -51,24 +62,20 @@ public class HoodieMergeHandleFactory {
       String fileId,
       TaskContextSupplier taskContextSupplier,
       Option<BaseKeyGenerator> keyGeneratorOpt) {
-    LOG.info("Create update handle for fileId {} and partition path {} at 
commit {}", fileId, partitionPath, instantTime);
-    if (table.requireSortedRecords()) {
-      if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
-        return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, 
instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
-            keyGeneratorOpt);
-      } else {
-        return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, 
recordItr, partitionPath, fileId, taskContextSupplier,
-            keyGeneratorOpt);
-      }
-    } else if (!WriteOperationType.isChangingRecords(operationType) && 
writeConfig.allowDuplicateInserts()) {
-      return new HoodieConcatHandle<>(writeConfig, instantTime, table, 
recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
-    } else {
-      if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
-        return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, 
table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
-      } else {
-        return new HoodieMergeHandle<>(writeConfig, instantTime, table, 
recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
-      }
-    }
+
+    boolean isFallbackEnabled = writeConfig.isMergeHandleFallbackEnabled();
+    Pair<String, String> mergeHandleClasses = 
getMergeHandleClassesWrite(operationType, writeConfig, table);
+    String logContext = String.format("for fileId %s and partition path %s at 
commit %s", fileId, partitionPath, instantTime);
+    LOG.info("Create HoodieMergeHandle implementation {} {}", 
mergeHandleClasses.getLeft(), logContext);
+
+    Class<?>[] constructorParamTypes = new Class<?>[] {
+        HoodieWriteConfig.class, String.class, HoodieTable.class, 
Iterator.class,
+        String.class, String.class, TaskContextSupplier.class, Option.class
+    };
+
+    return instantiateMergeHandle(
+        isFallbackEnabled, mergeHandleClasses.getLeft(), 
mergeHandleClasses.getRight(), logContext, constructorParamTypes,
+        writeConfig, instantTime, table, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
   }
 
   /**
@@ -84,19 +91,125 @@ public class HoodieMergeHandleFactory {
       HoodieBaseFile dataFileToBeMerged,
       TaskContextSupplier taskContextSupplier,
       Option<BaseKeyGenerator> keyGeneratorOpt) {
-    LOG.info("Get updateHandle for fileId {} and partitionPath {} at commit 
{}", fileId, partitionPath, instantTime);
+
+    boolean isFallbackEnabled = writeConfig.isMergeHandleFallbackEnabled();
+    Pair<String, String> mergeHandleClasses = 
getMergeHandleClassesCompaction(writeConfig, table);
+    String logContext = String.format("for fileId %s and partitionPath %s at 
commit %s", fileId, partitionPath, instantTime);
+    LOG.info("Create HoodieMergeHandle implementation {} {}", 
mergeHandleClasses.getLeft(), logContext);
+
+    Class<?>[] constructorParamTypes = new Class<?>[] {
+        HoodieWriteConfig.class, String.class, HoodieTable.class, Map.class,
+        String.class, String.class, HoodieBaseFile.class, 
TaskContextSupplier.class, Option.class
+    };
+
+    return instantiateMergeHandle(
+        isFallbackEnabled, mergeHandleClasses.getLeft(), 
mergeHandleClasses.getRight(), logContext, constructorParamTypes,
+        writeConfig, instantTime, table, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+  }
+
+  /**
+   * Creates a merge handle for compaction with file group reader.
+   */
+  public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+      HoodieWriteConfig config,
+      String instantTime,
+      HoodieTable<T, I, K, O> hoodieTable,
+      FileSlice fileSlice,
+      CompactionOperation operation,
+      TaskContextSupplier taskContextSupplier,
+      HoodieReaderContext<T> readerContext,
+      String maxInstantTime,
+      HoodieRecord.HoodieRecordType recordType) {
+
+    boolean isFallbackEnabled = config.isMergeHandleFallbackEnabled();
+
+    String mergeHandleClass = config.getCompactionMergeHandleClassName();
+    String logContext = String.format("for fileId %s and partitionPath %s at 
commit %s", operation.getFileId(), operation.getPartitionPath(), instantTime);
+    LOG.info("Create HoodieMergeHandle implementation {} {}", 
mergeHandleClass, logContext);
+
+    Class<?>[] constructorParamTypes = new Class<?>[] {
+        HoodieWriteConfig.class, String.class, HoodieTable.class, 
FileSlice.class, CompactionOperation.class,
+        TaskContextSupplier.class, HoodieReaderContext.class, String.class, 
HoodieRecord.HoodieRecordType.class
+    };
+
+    return instantiateMergeHandle(
+        isFallbackEnabled, mergeHandleClass, 
COMPACT_MERGE_HANDLE_CLASS_NAME.defaultValue(), logContext, 
constructorParamTypes,
+        config, instantTime, hoodieTable, fileSlice, operation, 
taskContextSupplier, readerContext, maxInstantTime, recordType);
+  }
+
+  /**
+   * Helper method to instantiate a HoodieMergeHandle via reflection, with an 
optional fallback.
+   */
+  private static <T, I, K, O> HoodieMergeHandle<T, I, K, O> 
instantiateMergeHandle(
+      boolean isFallbackEnabled,
+      String primaryClass,
+      String fallbackClass,
+      String logContext,
+      Class<?>[] constructorParamTypes,
+      Object... initargs) {
+    try {
+      return (HoodieMergeHandle<T, I, K, O>) 
ReflectionUtils.loadClass(primaryClass, constructorParamTypes, initargs);
+    } catch (Throwable e1) {
+      if (isFallbackEnabled && fallbackClass != null && 
!Objects.equals(primaryClass, fallbackClass)) {
+        try {
+          LOG.warn("HoodieMergeHandle implementation {} failed, now creating 
fallback implementation {} {}",
+              primaryClass, fallbackClass, logContext);
+          return (HoodieMergeHandle<T, I, K, O>) 
ReflectionUtils.loadClass(fallbackClass, constructorParamTypes, initargs);
+        } catch (Throwable e2) {
+          throw new HoodieException("Could not instantiate the fallback 
HoodieMergeHandle implementation: " + fallbackClass, e2);
+        }
+      }
+      throw new HoodieException("Could not instantiate the HoodieMergeHandle 
implementation: " + primaryClass, e1);
+    }
+  }
+
+  @VisibleForTesting
+  static Pair<String, String> getMergeHandleClassesWrite(WriteOperationType 
operationType, HoodieWriteConfig writeConfig, HoodieTable table) {
+    String mergeHandleClass;
+    String fallbackMergeHandleClass = null;
+    // Overwrite to a different implementation for {@link 
HoodieWriteMergeHandle} if sorting or CDC is enabled.
     if (table.requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, 
keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+      if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+        mergeHandleClass = 
HoodieSortedMergeHandleWithChangeLog.class.getName();
+      } else {
+        mergeHandleClass = HoodieSortedMergeHandle.class.getName();
+      }
+    } else if (!WriteOperationType.isChangingRecords(operationType) && 
writeConfig.allowDuplicateInserts()) {
+      mergeHandleClass = writeConfig.getConcatHandleClassName();
+      if 
(!mergeHandleClass.equals(HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.defaultValue()))
 {
+        fallbackMergeHandleClass = 
HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.defaultValue();
+      }
+    } else if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+      mergeHandleClass = HoodieMergeHandleWithChangeLog.class.getName();
+    } else {
+      mergeHandleClass = writeConfig.getMergeHandleClassName();
+      if 
(!mergeHandleClass.equals(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue()))
 {
+        fallbackMergeHandleClass = 
HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue();
+      }
+    }
+
+    return Pair.of(mergeHandleClass, fallbackMergeHandleClass);
+  }
+
+  @VisibleForTesting
+  static Pair<String, String> 
getMergeHandleClassesCompaction(HoodieWriteConfig writeConfig, HoodieTable 
table) {
+    String mergeHandleClass;
+    String fallbackMergeHandleClass = null;
+    // Overwrite to sorted implementation for {@link HoodieWriteMergeHandle} 
if sorting is required.
+    if (table.requireSortedRecords()) {
+      mergeHandleClass = HoodieSortedMergeHandle.class.getName();
     } else if (table.getMetaClient().getTableConfig().isCDCEnabled() && 
writeConfig.isYieldingPureLogForMor()) {
       // IMPORTANT: only index type that yields pure log files need to enable 
the cdc log files for compaction,
       // index type such as the BLOOM does not need this because it would do 
delta merge for inserts and generates log for updates,
       // both of these two cases are already handled in HoodieCDCExtractor.
-      return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, 
table, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+      mergeHandleClass = HoodieMergeHandleWithChangeLog.class.getName();
     } else {
-      return new HoodieMergeHandle<>(writeConfig, instantTime, table, 
keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+      mergeHandleClass = writeConfig.getMergeHandleClassName();
+      if 
(!mergeHandleClass.equals(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue()))
 {
+        fallbackMergeHandleClass = 
HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue();
+      }
     }
+
+    return Pair.of(mergeHandleClass, fallbackMergeHandleClass);
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 2f1b629096ff..eda78ecc4423 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -45,7 +45,7 @@ import java.util.Map;
 /**
  * A merge handle that supports logging change logs.
  */
-public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends 
HoodieMergeHandle<T, I, K, O> {
+public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends 
HoodieWriteMergeHandle<T, I, K, O> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeHandleWithChangeLog.class);
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index b5b0bd6e7be5..3327c781f985 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -46,7 +46,7 @@ import java.util.Queue;
  * keys in newRecordKeys (sorted in-memory).
  */
 @NotThreadSafe
-public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, 
I, K, O> {
+public class HoodieSortedMergeHandle<T, I, K, O> extends 
HoodieWriteMergeHandle<T, I, K, O> {
 
   private final Queue<String> newRecordKeysSorted = new PriorityQueue<>();
 
@@ -61,10 +61,9 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I,
    * Called by compactor code path.
    */
   public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-      Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, 
String fileId,
-      HoodieBaseFile dataFileToBeMerged, TaskContextSupplier 
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    super(config, instantTime, hoodieTable, keyToNewRecordsOrig, 
partitionPath, fileId, dataFileToBeMerged,
-        taskContextSupplier, keyGeneratorOpt);
+                                 Map<String, HoodieRecord<T>> 
keyToNewRecordsOrig, String partitionPath, String fileId,
+                                 HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, keyToNewRecordsOrig, 
partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, 
keyGeneratorOpt);
 
     newRecordKeysSorted.addAll(keyToNewRecords.keySet());
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
index 4d16876ff5b4..9ba999974d30 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -48,8 +48,8 @@ public class HoodieSortedMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMerg
    * Called by compactor code path.
    */
   public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
-                           Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
-                           HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
+                                              Map<String, HoodieRecord<T>> 
keyToNewRecords, String partitionPath, String fileId,
+                                              HoodieBaseFile 
dataFileToBeMerged, TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
     super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
similarity index 74%
copy from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
copy to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
index 7b81d728a358..8d389d7ba88e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
@@ -21,21 +21,17 @@ package org.apache.hudi.io;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -47,8 +43,9 @@ import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -64,12 +61,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.Set;
 
 /**
- * Handle to merge incoming records to those in storage.
+ * Handle to merge incoming records to those in storage row-by-row.
  * <p>
  * Simplified Logic:
  * For every existing record
@@ -77,13 +73,12 @@ import java.util.Set;
  *     else write the record as is
  * For all pending records from incoming batch, write to file.
  *
- * <p>
  * Illustration with simple data.
  * Incoming data:
  *     rec1_2, rec4_2, rec5_1, rec6_1
  * Existing data:
  *     rec1_1, rec2_1, rec3_1, rec4_1
- * <p>
+ *
  * For every existing record, merge w/ incoming if required and write to 
storage.
  *    => rec1_1 and rec1_2 is merged to write rec1_2 to storage
  *    => rec2_1 is written as is
@@ -91,62 +86,58 @@ import java.util.Set;
  *    => rec4_2 and rec4_1 is merged to write rec4_2 to storage
  * Write all pending records from incoming set to storage
  *    => rec5_1 and rec6_1
- * <p>
+ *
  * Final snapshot in storage
  * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
+ *
+ * </p>
  */
 @SuppressWarnings("Duplicates")
 @NotThreadSafe
-public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, 
O> {
+public class HoodieWriteMergeHandle<T, I, K, O> extends 
HoodieAbstractMergeHandle<T, I, K, O> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeHandle.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieWriteMergeHandle.class);
 
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter fileWriter;
 
-  protected StoragePath newFilePath;
-  protected StoragePath oldFilePath;
   protected long recordsWritten = 0;
   protected long recordsDeleted = 0;
   protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
-  protected Option<BaseKeyGenerator> keyGeneratorOpt;
-  protected HoodieBaseFile baseFileToMerge;
-
-  protected Option<String[]> partitionFields = Option.empty();
-  protected Object[] partitionValues = new Object[0];
 
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                                Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
+                                TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
     this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier,
         getLatestBaseFile(hoodieTable, partitionPath, fileId), 
keyGeneratorOpt);
   }
 
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, false);
-    init(recordItr);
-    init(fileId, partitionPath, baseFile);
-    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                                Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
+                                TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, 
taskContextSupplier, baseFile, keyGeneratorOpt, false);
+    populateIncomingRecordsMap(recordItr);
+    initMarkerFileAndFileWriter(fileId, partitionPath);
   }
 
   /**
    * Called by compactor code path.
    */
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
-                           HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                                Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
+                                HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier,
+                                Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, 
taskContextSupplier, dataFileToBeMerged, keyGeneratorOpt,
+        // preserveMetadata is disabled by default for MDT but enabled 
otherwise
+        !HoodieTableMetadata.isMetadataTable(config.getBasePath()));
     this.keyToNewRecords = keyToNewRecords;
-    init(fileId, this.partitionPath, dataFileToBeMerged);
-    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+    initMarkerFileAndFileWriter(fileId, this.partitionPath);
   }
 
   /**
-   * Used by `HoodieSparkFileGroupReaderBasedMergeHandle`.
+   * Used by `FileGroupReaderBasedMergeHandle`.
    *
    * @param config              Hudi write config
    * @param instantTime         Instant time to use
@@ -155,65 +146,30 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
    * @param hoodieTable         {@link HoodieTable} instance
    * @param taskContextSupplier Task context supplier
    */
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
-                           String fileId, HoodieTable<T, I, K, O> hoodieTable, 
TaskContextSupplier taskContextSupplier) {
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
+                                String fileId, HoodieTable<T, I, K, O> 
hoodieTable, TaskContextSupplier taskContextSupplier) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
   }
 
-  private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> 
keyGeneratorOpt, boolean populateMetaFields) {
-    ValidationUtils.checkArgument(populateMetaFields == 
!keyGeneratorOpt.isPresent());
-    this.keyGeneratorOpt = keyGeneratorOpt;
-  }
-
-  public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> 
hoodieTable, String partitionPath, String fileId) {
-    Option<HoodieBaseFile> baseFileOp = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
-    if (!baseFileOp.isPresent()) {
-      throw new NoSuchElementException(String.format("FileID %s of partition 
path %s does not exist.", fileId, partitionPath));
-    }
-    return baseFileOp.get();
+  @Override
+  public void doMerge() throws IOException {
+    HoodieMergeHelper.newInstance().runMerge(hoodieTable, this);
   }
 
   /**
-   * Extract old file path, initialize StorageWriter and WriteStatus.
+   * Initialize marker file and file writer.
    */
-  private void init(String fileId, String partitionPath, HoodieBaseFile 
baseFileToMerge) {
-    LOG.info("partitionPath:{}, fileId to be merged:{}", partitionPath, 
fileId);
-    this.baseFileToMerge = baseFileToMerge;
+  private void initMarkerFileAndFileWriter(String fileId, String 
partitionPath) {
     this.writtenRecordKeys = new HashSet<>();
-    writeStatus.setStat(new HoodieWriteStat());
     try {
-      String latestValidFilePath = baseFileToMerge.getFileName();
-      writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
-      // At the moment, we only support SI for overwrite with latest payload. 
So, we don't need to embed entire file slice here.
-      // HUDI-8518 will be taken up to fix it for any payload during which we 
might require entire file slice to be set here.
-      // Already AppendHandle adds all logs file from current file slice to 
HoodieDeltaWriteStat.
-      writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
-
-      HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(storage, instantTime,
-          new StoragePath(config.getBasePath()),
-          FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
-          hoodieTable.getPartitionMetafileFormat());
-      partitionMetadata.trySave();
-
-      String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken, 
fileId, hoodieTable.getBaseFileExtension());
-      makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
-
-      LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath, 
newFilePath);
-      // file name is same for all records, in this bunch
-      writeStatus.setFileId(fileId);
-      writeStatus.setPartitionPath(partitionPath);
-      writeStatus.getStat().setPartitionPath(partitionPath);
-      writeStatus.getStat().setFileId(fileId);
-      setWriteStatusPath();
-
       // Create Marker file,
       // uses name of `newFilePath` instead of `newFileName`
-      // in case the sub-class may roll over the file handle name.
+      // in case the subclass may roll over the file handle name.
       createMarkerFile(partitionPath, newFilePath.getName());
-
       // Create the writer for writing the new version file
-      fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, 
newFilePath, hoodieTable.getStorage(),
-          config, writeSchemaWithMetaFields, taskContextSupplier, 
recordMerger.getRecordType());
+      fileWriter = HoodieFileWriterFactory.getFileWriter(
+          instantTime, newFilePath, hoodieTable.getStorage(),
+          config, writeSchemaWithMetaFields, taskContextSupplier, 
getRecordType());
     } catch (IOException io) {
       LOG.error("Error in update task at commit {}", instantTime, io);
       writeStatus.setGlobalError(io);
@@ -222,19 +178,14 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     }
   }
 
-  protected void setWriteStatusPath() {
-    writeStatus.getStat().setPath(new StoragePath(config.getBasePath()), 
newFilePath);
-  }
-
-  protected void makeOldAndNewFilePaths(String partitionPath, String 
oldFileName, String newFileName) {
-    oldFilePath = makeNewFilePath(partitionPath, oldFileName);
-    newFilePath = makeNewFilePath(partitionPath, newFileName);
+  protected HoodieRecord.HoodieRecordType getRecordType() {
+    return recordMerger.getRecordType();
   }
 
   /**
    * Initialize a spillable map for incoming records.
    */
-  protected void initializeIncomingRecordsMap() {
+  protected void initIncomingRecordsMap() {
     try {
       // Load the new records in a map
       long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
@@ -260,8 +211,8 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   /**
    * Load the new incoming records in a map and return partitionPath.
    */
-  protected void init(Iterator<HoodieRecord<T>> newRecordsItr) {
-    initializeIncomingRecordsMap();
+  protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>> 
newRecordsItr) {
+    initIncomingRecordsMap();
     while (newRecordsItr.hasNext()) {
       HoodieRecord<T> record = newRecordsItr.next();
       // update the new location of the record, so we know where to find it 
next
@@ -309,7 +260,8 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     writeInsertRecord(newRecord, schema, config.getProps());
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, 
Properties prop) {
+  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, 
Properties prop)
+      throws IOException {
     if (writeRecord(newRecord, null, Option.of(newRecord), schema, prop, 
HoodieOperation.isDelete(newRecord.getOperation()))) {
       insertRecordsWritten++;
     }
@@ -549,42 +501,4 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
               baseFileToMerge.getCommitTime(), oldNumWrites));
     }
   }
-
-  public Iterator<List<WriteStatus>> getWriteStatusesAsIterator() {
-    List<WriteStatus> statuses = getWriteStatuses();
-    // TODO(vc): This needs to be revisited
-    if (getPartitionPath() == null) {
-      LOG.info("Upsert Handle has partition path as null {}, {}", 
getOldFilePath(), statuses);
-    }
-    return Collections.singletonList(statuses).iterator();
-  }
-
-  public StoragePath getOldFilePath() {
-    return oldFilePath;
-  }
-
-  @Override
-  public IOType getIOType() {
-    return IOType.MERGE;
-  }
-
-  public HoodieBaseFile baseFileForMerge() {
-    return baseFileToMerge;
-  }
-
-  public void setPartitionFields(Option<String[]> partitionFields) {
-    this.partitionFields = partitionFields;
-  }
-
-  public Option<String[]> getPartitionFields() {
-    return this.partitionFields;
-  }
-
-  public void setPartitionValues(Object[] partitionValues) {
-    this.partitionValues = partitionValues;
-  }
-
-  public Object[] getPartitionValues() {
-    return this.partitionValues;
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
index a457cbf5b324..29fe5246b613 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
@@ -18,11 +18,20 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieUpsertException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
@@ -33,6 +42,8 @@ import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTI
 import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
 
 public class IOUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(IOUtils.class);
+
   /**
    * Dynamic calculation of max memory to use for spillable map. There is 
always more than one task
    * running on an executor and each task maintains a spillable map.
@@ -81,10 +92,40 @@ public class IOUtils {
   }
 
   public static long getMaxMemoryPerCompaction(TaskContextSupplier context, 
Map<String, String> options) {
-    if (options.containsKey(MAX_MEMORY_FOR_COMPACTION)) {
-      return Long.parseLong(options.get(MAX_MEMORY_FOR_COMPACTION));
+    if (options.containsKey(MAX_MEMORY_FOR_COMPACTION.key())) {
+      return Long.parseLong(options.get(MAX_MEMORY_FOR_COMPACTION.key()));
     }
     String fraction = 
options.getOrDefault(MAX_MEMORY_FRACTION_FOR_COMPACTION.key(), 
MAX_MEMORY_FRACTION_FOR_COMPACTION.defaultValue());
     return getMaxMemoryAllowedForMerge(context, fraction);
   }
+
+  /**
+   * Triggers the merge action with given merge handle {@code 
HoodieMergeHandle}.
+   *
+   * <p>Note: it can be either regular write path merging
+   * or compact merging based on impls of the {@link HoodieMergeHandle}.
+   *
+   * @param mergeHandle The merge handle
+   * @param instantTime The instant time
+   * @param fileId      The file ID
+   *
+   * @return the write status iterator
+   */
+  public static Iterator<List<WriteStatus>> runMerge(HoodieMergeHandle<?, ?, 
?, ?> mergeHandle,
+                                                     String instantTime,
+                                                     String fileId) throws 
IOException {
+    if (mergeHandle.getOldFilePath() == null) {
+      throw new HoodieUpsertException(
+          "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
+    } else {
+      mergeHandle.doMerge();
+    }
+
+    // TODO(vc): This needs to be revisited
+    if (mergeHandle.getPartitionPath() == null) {
+      LOG.info("Upsert Handle has partition path as null " + 
mergeHandle.getOldFilePath() + ", " + mergeHandle.getWriteStatuses());
+    }
+
+    return 
Collections.singletonList(mergeHandle.getWriteStatuses()).iterator();
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 667e2a183a77..29f9ff7b4696 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -77,7 +77,6 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -86,7 +85,6 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
-import org.apache.hudi.table.action.commit.HoodieMergeHelper;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.storage.HoodieLayoutFactory;
@@ -1207,12 +1205,4 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     }
     return new HashSet<>(Arrays.asList(partitionFields.get()));
   }
-
-  public void runMerge(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String 
instantTime, String fileId) throws IOException {
-    if (upsertHandle.getOldFilePath() == null) {
-      throw new HoodieUpsertException("Error in finding the old file path at 
commit " + instantTime + " for fileId: " + fileId);
-    } else {
-      HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 138e6a840d4b..b96848ae40fc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.table.HoodieTable;
 
 import java.io.IOException;
@@ -33,30 +33,30 @@ public abstract class BaseMergeHelper {
   /**
    * Read records from previous version of base file and merge.
    * @param table Hoodie Table
-   * @param upsertHandle Merge Handle
+   * @param mergeHandle Merge Handle
    * @throws IOException in case of error
    */
-  public abstract void runMerge(HoodieTable<?, ?, ?, ?> table, 
HoodieMergeHandle<?, ?, ?, ?> upsertHandle) throws IOException;
+  public abstract void runMerge(HoodieTable<?, ?, ?, ?> table, 
HoodieWriteMergeHandle<?, ?, ?, ?> mergeHandle) throws IOException;
 
   /**
    * Consumer that dequeues records from queue and sends to Merge Handle.
    */
   protected static class UpdateHandler implements HoodieConsumer<HoodieRecord, 
Void> {
 
-    private final HoodieMergeHandle upsertHandle;
+    private final HoodieWriteMergeHandle mergeHandle;
 
-    protected UpdateHandler(HoodieMergeHandle upsertHandle) {
-      this.upsertHandle = upsertHandle;
+    protected UpdateHandler(HoodieWriteMergeHandle mergeHandle) {
+      this.mergeHandle = mergeHandle;
     }
 
     @Override
     public void consume(HoodieRecord record) {
-      upsertHandle.write(record);
+      mergeHandle.write(record);
     }
 
     @Override
     public Void finish() {
-      upsertHandle.close();
+      mergeHandle.close();
       return null;
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index f7b93b59457e..296a178c275f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -35,7 +35,7 @@ import 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
 import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
@@ -74,7 +74,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
 
   @Override
   public void runMerge(HoodieTable<?, ?, ?, ?> table,
-                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+                       HoodieWriteMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
     HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index f3757fe9e3ac..c9ddce099eeb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -44,7 +44,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.FileGroupReaderBasedAppendHandle;
-import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
@@ -157,9 +158,9 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
                                    HoodieTable table,
                                    String maxInstantTime,
                                    TaskContextSupplier taskContextSupplier) 
throws IOException {
-    FileGroupReaderBasedMergeHandle<T, ?, ?, ?> mergeHandle = new 
FileGroupReaderBasedMergeHandle<>(writeConfig,
+    HoodieMergeHandle<T, ?, ?, ?> mergeHandle = 
HoodieMergeHandleFactory.create(writeConfig,
         instantTime, table, getFileSliceFromOperation(operation, 
writeConfig.getBasePath()), operation, taskContextSupplier, 
hoodieReaderContext, maxInstantTime, getEngineRecordType());
-    mergeHandle.write();
+    mergeHandle.doMerge();
     return mergeHandle.close();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieMergeHandleFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieMergeHandleFactory.java
new file mode 100644
index 000000000000..ca59cde89c66
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieMergeHandleFactory.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Properties;
+
+import static 
org.apache.hudi.config.HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieMergeHandleFactory {
+
+  private static final String CUSTOM_MERGE_HANDLE = 
"io.custom.CustomMergeHandle.java";
+  private static final String BASE_PATH = "base_path";
+
+  @Mock
+  private HoodieTable mockHoodieTable;
+  @Mock
+  private HoodieTableConfig mockHoodieTableConfig;
+  @Mock
+  private HoodieTableMetaClient mockMetaClient;
+
+  @BeforeEach
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(mockHoodieTable.getMetaClient()).thenReturn(mockMetaClient);
+    when(mockMetaClient.getTableConfig()).thenReturn(mockHoodieTableConfig);
+  }
+
+  @Test
+  public void validateWriterPathFactoryImpl() {
+    // default case
+    Properties properties = new Properties();
+    properties.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(), 
"false");
+    Pair mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieWriteMergeHandle.class.getName());
+
+    // sorted case
+    when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+    when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(true);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieSortedMergeHandleWithChangeLog.class.getName());
+    when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(false);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieSortedMergeHandle.class.getName());
+
+    // non-sorted: no CDC cases
+    when(mockHoodieTable.requireSortedRecords()).thenReturn(false);
+    Properties propsWithDups = new Properties();
+    propsWithDups.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(), 
"true");
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT, 
getWriterConfig(propsWithDups), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieConcatHandle.class.getName());
+
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(propsWithDups), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieWriteMergeHandle.class.getName());
+
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieWriteMergeHandle.class.getName());
+
+    // non-sorted: CDC enabled
+    when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(true);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(propsWithDups), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieMergeHandleWithChangeLog.class.getName());
+
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieMergeHandleWithChangeLog.class.getName());
+
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieMergeHandleWithChangeLog.class.getName());
+
+    // custom merge handle
+    when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(false);
+    properties.setProperty(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.key(), 
CUSTOM_MERGE_HANDLE);
+    properties.setProperty(HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.key(), 
CUSTOM_MERGE_HANDLE);
+    propsWithDups.setProperty(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.key(), 
CUSTOM_MERGE_HANDLE);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, CUSTOM_MERGE_HANDLE, 
HoodieWriteMergeHandle.class.getName());
+
+    when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT, 
getWriterConfig(properties), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieSortedMergeHandle.class.getName());
+
+    when(mockHoodieTable.requireSortedRecords()).thenReturn(false);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT, 
getWriterConfig(propsWithDups), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieConcatHandle.class.getName());
+
+    
propsWithDups.setProperty(HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.key(), 
CUSTOM_MERGE_HANDLE);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT, 
getWriterConfig(propsWithDups), mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, CUSTOM_MERGE_HANDLE, 
HoodieConcatHandle.class.getName());
+  }
+
+  @Test
+  public void validateCompactionPathFactoryImpl() {
+    // default case
+    Properties properties = new Properties();
+    Pair mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
 mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieWriteMergeHandle.class.getName());
+
+    // sorted case
+    when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
 mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieSortedMergeHandle.class.getName());
+
+    // custom case
+    when(mockHoodieTable.requireSortedRecords()).thenReturn(false);
+    properties.setProperty(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.key(), 
CUSTOM_MERGE_HANDLE);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
 mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, CUSTOM_MERGE_HANDLE, 
HoodieWriteMergeHandle.class.getName());
+
+    when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+    mergeHandleClasses = 
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
 mockHoodieTable);
+    validateMergeClasses(mergeHandleClasses, 
HoodieSortedMergeHandle.class.getName());
+
+  }
+
+  private void validateMergeClasses(Pair<String, String> mergeHandleClasses, 
String expectedMergeHandleClasses) {
+    validateMergeClasses(mergeHandleClasses, expectedMergeHandleClasses, null);
+  }
+
+  private void validateMergeClasses(Pair<String, String> mergeHandleClasses, 
String expectedMergeHandleClass, String expectedFallbackClass) {
+    Assertions.assertEquals(expectedMergeHandleClass, 
mergeHandleClasses.getLeft());
+    Assertions.assertEquals(expectedFallbackClass, 
mergeHandleClasses.getRight());
+  }
+
+  private HoodieWriteConfig getWriterConfig(Properties properties) {
+    return 
HoodieWriteConfig.newBuilder().withPath(BASE_PATH).withProperties(properties).build();
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index 126ad3f47477..2888faf0368c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -41,7 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small 
data buffers).
+ * A {@link HoodieWriteMergeHandle} that supports MERGE write 
incrementally(small data buffers).
  *
  * <P>This handle is needed from the second mini-batch write for COW data 
bucket
  * when the data bucket is written using multiple mini-batches.
@@ -51,7 +51,7 @@ import java.util.List;
  * behaves like the new data buffer are appended to the old file.
  */
 public class FlinkMergeAndReplaceHandle<T, I, K, O>
-    extends HoodieMergeHandle<T, I, K, O>
+    extends HoodieWriteMergeHandle<T, I, K, O>
     implements MiniBatchHandle {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMergeAndReplaceHandle.class);
@@ -135,7 +135,7 @@ public class FlinkMergeAndReplaceHandle<T, I, K, O>
   }
 
   @Override
-  protected void initializeIncomingRecordsMap() {
+  protected void initIncomingRecordsMap() {
     LOG.info("Initialize on-heap keyToNewRecords for incoming records.");
     // the incoming records are already buffered on heap and the underlying 
bytes are managed by memory pool
     // in Flink write buffer, so there is no need to use ExternalSpillableMap.
@@ -156,6 +156,7 @@ public class FlinkMergeAndReplaceHandle<T, I, K, O>
     writeStatus.getStat().setPath(new StoragePath(config.getBasePath()), 
oldFilePath);
   }
 
+  @Override
   boolean needsUpdateLocation() {
     // No need to update location for Flink hoodie records because all the 
records are pre-tagged
     // with the desired locations.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 1d8078707e77..ab4862bbb975 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -41,7 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small 
data buffers).
+ * A {@link HoodieWriteMergeHandle} that supports MERGE write 
incrementally(small data buffers).
  *
  * <p>For a new data buffer, it initializes and set up the next file path to 
write,
  * and closes the file path when the data buffer write finish. When next data 
buffer
@@ -52,7 +52,7 @@ import java.util.List;
  * @see FlinkMergeAndReplaceHandle
  */
 public class FlinkMergeHandle<T, I, K, O>
-    extends HoodieMergeHandle<T, I, K, O>
+    extends HoodieWriteMergeHandle<T, I, K, O>
     implements MiniBatchHandle {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMergeHandle.class);
@@ -155,7 +155,7 @@ public class FlinkMergeHandle<T, I, K, O>
   }
 
   @Override
-  protected void initializeIncomingRecordsMap() {
+  protected void initIncomingRecordsMap() {
     LOG.info("Initialize on-heap keyToNewRecords for incoming records.");
     // the incoming records are already buffered on heap and the underlying 
bytes are managed by memory pool
     // in Flink write buffer, so there is no need to use ExternalSpillableMap.
@@ -187,6 +187,7 @@ public class FlinkMergeHandle<T, I, K, O>
     }
   }
 
+  @Override
   boolean needsUpdateLocation() {
     // No need to update location for Flink hoodie records because all the 
records are pre-tagged
     // with the desired locations.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index d574125eb1eb..438c5d563df4 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -65,6 +65,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
+  @Override
   protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema 
writerSchema)
       throws IOException {
     // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
@@ -80,6 +81,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
     return result;
   }
 
+  @Override
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
     Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // TODO Remove these unnecessary newInstance invocations
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 815baf39d843..aee1da3a8134 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -41,10 +41,11 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -413,14 +414,8 @@ public class HoodieFlinkCopyOnWriteTable<T>
     // always using avro record merger for legacy compaction since log scanner 
do not support rowdata reading yet.
     config.setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
     // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
-    return handleUpdateInternal(upsertHandle, instantTime, fileId);
-  }
-
-  protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String 
instantTime,
-                                                             String fileId) 
throws IOException {
-    runMerge(upsertHandle, instantTime, fileId);
-    return upsertHandle.getWriteStatusesAsIterator();
+    HoodieMergeHandle mergeHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
+    return IOUtils.runMerge(mergeHandle, instantTime, fileId);
   }
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index d156a6e6e1e4..d713ead1d53a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -32,8 +32,9 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.FlinkLazyInsertIterable;
 import org.apache.hudi.io.ExplicitWriteHandleFactory;
 import org.apache.hudi.io.HoodieCreateHandle;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -166,7 +167,7 @@ public abstract class BaseFlinkCommitActionExecutor<T> 
extends
         // the second batch batch2 tries to reuse the same bucket
         // and append instead of UPDATE.
         return handleInsert(fileIdHint, recordItr);
-      } else if (this.writeHandle instanceof HoodieMergeHandle) {
+      } else if (this.writeHandle instanceof HoodieWriteMergeHandle) {
         return handleUpdate(partitionPath, fileIdHint, recordItr);
       } else {
         switch (bucketType) {
@@ -190,19 +191,13 @@ public abstract class BaseFlinkCommitActionExecutor<T> 
extends
                                                   Iterator<HoodieRecord<T>> 
recordItr)
       throws IOException {
     // This is needed since sometimes some buckets are never picked in 
getPartition() and end up with 0 records
-    HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?, 
?>) this.writeHandle;
+    HoodieWriteMergeHandle<?, ?, ?, ?> upsertHandle = 
(HoodieWriteMergeHandle<?, ?, ?, ?>) this.writeHandle;
     if (upsertHandle.isEmptyNewRecords() && !recordItr.hasNext()) {
       LOG.info("Empty partition with fileId => {}.", fileId);
       return Collections.singletonList((List<WriteStatus>) 
Collections.EMPTY_LIST).iterator();
     }
     // these are updates
-    return handleUpdateInternal(upsertHandle, fileId);
-  }
-
-  protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
-      throws IOException {
-    table.runMerge(upsertHandle, instantTime, fileId);
-    return upsertHandle.getWriteStatusesAsIterator();
+    return IOUtils.runMerge(upsertHandle, instantTime, fileId);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index f5857ca8eb18..873d0f55d338 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -44,6 +44,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -282,14 +283,8 @@ public class HoodieJavaCopyOnWriteTable<T>
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile)
       throws IOException {
     // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
-    return handleUpdateInternal(upsertHandle, instantTime, fileId);
-  }
-
-  protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String 
instantTime,
-                                                             String fileId) 
throws IOException {
-    runMerge(upsertHandle, instantTime, fileId);
-    return upsertHandle.getWriteStatusesAsIterator();
+    HoodieMergeHandle mergeHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
+    return IOUtils.runMerge(mergeHandle, instantTime, fileId);
   }
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 3e9974fc5ebe..092daf2cead2 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.execution.JavaLazyInsertIterable;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.storage.StoragePath;
@@ -241,17 +242,11 @@ public abstract class BaseJavaCommitActionExecutor<T> 
extends
       return Collections.singletonList((List<WriteStatus>) 
Collections.EMPTY_LIST).iterator();
     }
     // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, 
recordItr);
-    return handleUpdateInternal(upsertHandle, fileId);
+    HoodieMergeHandle<?, ?, ?, ?> mergeHandle = getUpdateHandle(partitionPath, 
fileId, recordItr);
+    return IOUtils.runMerge(mergeHandle, instantTime, fileId);
   }
 
-  protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
-      throws IOException {
-    table.runMerge(upsertHandle, instantTime, fileId);
-    return upsertHandle.getWriteStatusesAsIterator();
-  }
-
-  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String 
fileId, Iterator<HoodieRecord<T>> recordItr) {
+  protected HoodieMergeHandle<?, ?, ?, ?> getUpdateHandle(String 
partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
     Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
     if (!config.populateMetaFields()) {
       try {
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index a8d4e5990f1b..0d2daf9fe34c 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -42,7 +42,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.table.HoodieTable;
@@ -261,9 +261,9 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
     Pair<String, String> partitionAndBaseFilePaths = 
getPartitionAndBaseFilePathsFromLatestCommitMetadata(metaClient);
     HoodieBaseFile baseFile = new 
HoodieBaseFile(partitionAndBaseFilePaths.getRight());
 
-    HoodieMergeHandle handle = null;
+    HoodieWriteMergeHandle handle = null;
     try {
-      handle = new HoodieMergeHandle(config, instantTime, table, new 
HashMap<>(),
+      handle = new HoodieWriteMergeHandle(config, instantTime, table, new 
HashMap<>(),
           partitionAndBaseFilePaths.getLeft(), 
FSUtils.getFileId(baseFile.getFileName()), baseFile, new 
JavaTaskContextSupplier(),
           config.populateMetaFields() ? Option.empty() :
               Option.of((BaseKeyGenerator) 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(config.getProps())));
@@ -281,7 +281,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
       config.getProps().setProperty("hoodie.merge.data.validation.enabled", 
"true");
       HoodieWriteConfig cfg2 = 
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
       // does the handle need to be closed to clean up the writer it contains?
-      handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new 
HashMap<>(),
+      handle = new HoodieWriteMergeHandle(cfg2, newInstantTime, table, new 
HashMap<>(),
           partitionAndBaseFilePaths.getLeft(), 
FSUtils.getFileId(baseFile.getFileName()), baseFile, new 
JavaTaskContextSupplier(),
           config.populateMetaFields() ? Option.empty() :
               Option.of((BaseKeyGenerator) 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(config.getProps())));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index f10985ab3694..fd88f4108ba3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -85,6 +85,8 @@ class SparkReaderContextFactory implements 
ReaderContextFactory<InternalRow> {
     // Broadcast: Configuration.
     Configuration configs = getHadoopConfiguration(jsc.hadoopConfiguration());
     schemaEvolutionConfigs.forEach(configs::set);
+    configs.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), 
sqlConf.getConfString(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key()));
+    configs.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT().key(), 
sqlConf.getConfString(SQLConf.PARQUET_WRITE_LEGACY_FORMAT().key()));
     configurationBroadcast = jsc.broadcast(new 
SerializableConfiguration(configs));
     // Broadcast: ParquetReader.
     // Spark parquet reader has to be instantiated on the driver and broadcast 
to the executors
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 4de3cc12ae74..1e7af1c31e6d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -31,6 +31,8 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.utils.SparkPartitionUtils;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -47,6 +49,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -77,6 +80,7 @@ import 
org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
+import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -237,21 +241,32 @@ public class HoodieSparkCopyOnWriteTable<T>
       String instantTime, String partitionPath, String fileId,
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile 
oldDataFile) throws IOException {
     // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
-    return handleUpdateInternal(upsertHandle, instantTime, fileId);
-  }
-
-  protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String 
instantTime,
-                                                             String fileId) 
throws IOException {
-    runMerge(upsertHandle, instantTime, fileId);
-    return upsertHandle.getWriteStatusesAsIterator();
+    HoodieMergeHandle mergeHandle = getUpdateHandle(instantTime, 
partitionPath, fileId, keyToNewRecords, oldDataFile);
+    return IOUtils.runMerge(mergeHandle, instantTime, fileId);
   }
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
-      Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile 
dataFileToBeMerged) {
-    Option<BaseKeyGenerator> keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
-    return HoodieMergeHandleFactory.create(config, instantTime, this, 
keyToNewRecords, partitionPath, fileId,
+                                              Map<String, HoodieRecord<T>> 
keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
+    Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
+    if (!config.populateMetaFields()) {
+      try {
+        keyGeneratorOpt = Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(config.getProps())));
+      } catch (Exception e) {
+        throw new HoodieException("Only BaseKeyGenerator (or any key generator 
that extends from BaseKeyGenerator) are supported when meta "
+            + "columns are disabled. Please choose the right key generator if 
you wish to disable meta fields.", e);
+      }
+    }
+    HoodieMergeHandle mergeHandle = HoodieMergeHandleFactory.create(config, 
instantTime, this, keyToNewRecords, partitionPath, fileId,
         dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+    if (mergeHandle.getOldFilePath() != null && 
mergeHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
+      Option<String[]> partitionFields = 
getMetaClient().getTableConfig().getPartitionFields();
+      Object[] partitionValues = 
SparkPartitionUtils.getPartitionFieldVals(partitionFields, 
mergeHandle.getPartitionPath(),
+          getMetaClient().getTableConfig().getBootstrapBasePath().get(),
+          mergeHandle.getWriterSchema(), (Configuration) 
getStorageConf().unwrap());
+      mergeHandle.setPartitionFields(partitionFields);
+      mergeHandle.setPartitionValues(partitionValues);
+    }
+    return mergeHandle;
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 23c88718f183..a9276fd37902 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.SparkPartitionUtils;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -30,16 +29,12 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
-import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkMetadataWriterFactory;
-import org.apache.hudi.table.action.commit.HoodieMergeHelper;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.TaskContext;
 import org.apache.spark.TaskContext$;
 
@@ -138,22 +133,4 @@ public abstract class HoodieSparkTable<T>
     final TaskContext taskContext = TaskContext.get();
     return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
   }
-
-  @Override
-  public void runMerge(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String 
instantTime, String fileId) throws IOException {
-    if (upsertHandle.getOldFilePath() == null) {
-      throw new HoodieUpsertException("Error in finding the old file path at 
commit " + instantTime + " for fileId: " + fileId);
-    } else {
-      if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
-        Option<String[]> partitionFields = 
getMetaClient().getTableConfig().getPartitionFields();
-        Object[] partitionValues = 
SparkPartitionUtils.getPartitionFieldVals(partitionFields, 
upsertHandle.getPartitionPath(),
-            getMetaClient().getTableConfig().getBootstrapBasePath().get(),
-            upsertHandle.getWriterSchema(), 
getStorageConf().unwrapAs(Configuration.class));
-        upsertHandle.setPartitionFields(partitionFields);
-        upsertHandle.setPartitionValues(partitionValues);
-      }
-      HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
-    }
-  }
-
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 8ae910934464..b2119ddf7ce2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.client.utils.SparkPartitionUtils;
+import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.client.WriteStatus;
 import 
org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -45,10 +47,10 @@ import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.HoodieSparkIndexClient;
-import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.table.HoodieTable;
@@ -57,6 +59,7 @@ import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -379,19 +382,22 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     }
 
     // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, 
recordItr);
-    return handleUpdateInternal(upsertHandle, fileId);
-  }
-
-  protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
-      throws IOException {
-    table.runMerge(upsertHandle, instantTime, fileId);
-    return upsertHandle.getWriteStatusesAsIterator();
+    HoodieMergeHandle mergeHandle = getUpdateHandle(partitionPath, fileId, 
recordItr);
+    return IOUtils.runMerge(mergeHandle, instantTime, fileId);
   }
 
   protected HoodieMergeHandle getUpdateHandle(String partitionPath, String 
fileId, Iterator<HoodieRecord<T>> recordItr) {
-    return HoodieMergeHandleFactory.create(operationType, config, instantTime, 
table, recordItr, partitionPath, fileId,
+    HoodieMergeHandle mergeHandle = 
HoodieMergeHandleFactory.create(operationType, config, instantTime, table, 
recordItr, partitionPath, fileId,
         taskContextSupplier, keyGeneratorOpt);
+    if (mergeHandle.getOldFilePath() != null && 
mergeHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
+      Option<String[]> partitionFields = 
table.getMetaClient().getTableConfig().getPartitionFields();
+      Object[] partitionValues = 
SparkPartitionUtils.getPartitionFieldVals(partitionFields, 
mergeHandle.getPartitionPath(),
+          table.getMetaClient().getTableConfig().getBootstrapBasePath().get(),
+          mergeHandle.getWriterSchema(), (Configuration) 
table.getStorageConf().unwrap());
+      mergeHandle.setPartitionFields(partitionFields);
+      mergeHandle.setPartitionValues(partitionValues);
+    }
+    return mergeHandle;
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 687d92ccd65d..a4168958e6b5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -32,7 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.CreateHandleFactory;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
@@ -132,7 +132,7 @@ public class TestUpdateSchemaEvolution extends 
HoodieSparkClientTestHarness impl
                                                    List<HoodieRecord> 
updateRecords, String assertMsg, boolean isAssertThrow, Class 
expectedExceptionType) {
     jsc.parallelize(Arrays.asList(1)).map(x -> {
       Executable executable = () -> {
-        HoodieMergeHandle mergeHandle = new 
HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
+        HoodieWriteMergeHandle mergeHandle = new 
HoodieWriteMergeHandle(updateTable.getConfig(), "101", updateTable,
             updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), 
insertResult.getFileId(), supplier, Option.empty());
         List<GenericRecord> oldRecords = 
HoodieIOFactory.getIOFactory(updateTable.getStorage())
             .getFileFormatUtils(updateTable.getBaseFileFormat())
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieDefaultMergeHandle.java
similarity index 99%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
rename to 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieDefaultMergeHandle.java
index 590c32298823..2fab2abf24ec 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieDefaultMergeHandle.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 @SuppressWarnings("unchecked")
-public class TestHoodieMergeHandle extends HoodieSparkClientTestHarness {
+public class TestHoodieDefaultMergeHandle extends HoodieSparkClientTestHarness 
{
 
   @BeforeEach
   public void setUp() throws Exception {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index be168c9d73dd..0362a619499f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -31,7 +31,7 @@ import 
org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -133,7 +133,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
   private final List<String> validBlockInstants = new ArrayList<>();
   // Use scanV2 method.
   private final boolean enableOptimizedLogBlocksScan;
-  protected FileGroupRecordBuffer<T> recordBuffer;
+  protected HoodieFileGroupRecordBuffer<T> recordBuffer;
   // Allows to consider inflight instants while merging log records
   protected boolean allowInflightInstants;
   // table version for compatibility
@@ -142,7 +142,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
   protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage, 
List<String> logFilePaths,
                                       boolean reverseReader, int bufferSize, 
Option<InstantRange> instantRange,
                                       boolean withOperationField, boolean 
forceFullScan, Option<String> partitionNameOverride,
-                                      Option<String> keyFieldOverride, boolean 
enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
+                                      Option<String> keyFieldOverride, boolean 
enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer,
                                       boolean allowInflightInstants) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler() != null ? 
readerContext.getSchemaHandler().getRequiredSchema() : null;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index f6efc50426d0..36ee213c44f4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -22,8 +22,8 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
 import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -66,7 +66,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient metaClient, HoodieStorage storage, List<String> 
logFilePaths, boolean reverseReader,
                                       int bufferSize, Option<InstantRange> 
instantRange, boolean withOperationField, boolean forceFullScan,
                                       Option<String> partitionName, 
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
-                                      FileGroupRecordBuffer<T> recordBuffer, 
boolean allowInflightInstants) {
+                                      HoodieFileGroupRecordBuffer<T> 
recordBuffer, boolean allowInflightInstants) {
     super(readerContext, metaClient, storage, logFilePaths, reverseReader, 
bufferSize, instantRange, withOperationField,
         forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
 
@@ -174,7 +174,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     private boolean forceFullScan = true;
     private boolean enableOptimizedLogBlocksScan = false;
 
-    private FileGroupRecordBuffer<T> recordBuffer;
+    private HoodieFileGroupRecordBuffer<T> recordBuffer;
     private boolean allowInflightInstants = false;
     private HoodieTableMetaClient metaClient;
 
@@ -244,7 +244,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       return this;
     }
 
-    public Builder<T> withRecordBuffer(FileGroupRecordBuffer<T> recordBuffer) {
+    public Builder<T> withRecordBuffer(HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
       this.recordBuffer = recordBuffer;
       return this;
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index ec1da45c8c41..0881a95f6ae2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -156,7 +156,7 @@ class ExpressionPayload(@transient record: GenericRecord,
     }
     if (resultRecordOpt == null) {
       // If there is no condition matched, just filter this record.
-      // here we return a IGNORE_RECORD, HoodieMergeHandle will not handle it.
+      // here we return a IGNORE_RECORD, HoodieWriteMergeHandle will not 
handle it.
       HOption.of(HoodieRecord.SENTINEL)
     } else {
       resultRecordOpt
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index a718a1f20c81..52ef94ab85fa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -78,7 +78,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
@@ -467,9 +467,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
       HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath);
 
-      HoodieMergeHandle handle = null;
+      HoodieWriteMergeHandle handle = null;
       try {
-        handle = new HoodieMergeHandle(config, instantTime, table, new 
HashMap<>(),
+        handle = new HoodieWriteMergeHandle(config, instantTime, table, new 
HashMap<>(),
             partitionPath, FSUtils.getFileId(baseFile.getFileName()), 
baseFile, new SparkTaskContextSupplier(),
             config.populateMetaFields() ? Option.empty() :
                 Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
@@ -490,7 +490,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         final String newInstantTime = "006";
         config.getProps().setProperty("hoodie.merge.data.validation.enabled", 
"true");
         HoodieWriteConfig cfg2 = 
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
-        handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new 
HashMap<>(),
+        handle = new HoodieWriteMergeHandle(cfg2, newInstantTime, table, new 
HashMap<>(),
             partitionPath, FSUtils.getFileId(baseFile.getFileName()), 
baseFile, new SparkTaskContextSupplier(),
             config.populateMetaFields() ? Option.empty() :
                 Option.of((BaseKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
index 4adbb5afa416..4452268fa445 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
@@ -93,7 +93,7 @@ public class TestMergeHandle extends BaseTestHandle {
     List<HoodieRecord> newRecords = 
dataGenerator.generateUniqueUpdates(instantTime, numUpdates);
     int numDeletes = generateDeleteRecords(newRecords, dataGenerator, 
instantTime);
     assertTrue(numDeletes > 0);
-    HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, instantTime, 
table, newRecords.iterator(), partitionPath, fileId, new 
LocalTaskContextSupplier(),
+    HoodieWriteMergeHandle mergeHandle = new HoodieWriteMergeHandle(config, 
instantTime, table, newRecords.iterator(), partitionPath, fileId, new 
LocalTaskContextSupplier(),
         new HoodieBaseFile(fileGroup.getAllBaseFiles().findFirst().get()), 
Option.empty());
     HoodieMergeHelper.newInstance().runMerge(table, mergeHandle);
     WriteStatus writeStatus = mergeHandle.writeStatus;


Reply via email to