This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5143a982bca [HUDI-7269] Fallback to key based merge if positions are 
missing from log block (#11415)
5143a982bca is described below

commit 5143a982bca79fadef1a819c17b73c123f5376ac
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Jun 11 09:09:23 2024 -0400

    [HUDI-7269] Fallback to key based merge if positions are missing from log 
block (#11415)
    
    Fallback to key based merging if positions are missing from a log block when
    doing position based read in the fg reader. Changes:
    
    - Make position based buffer extend key based buffer so we can fall back
       to key based buffer.
    - Move some position based logic from the  base buffer into the position
       buffer because that is the only place it is used.
    - If a log block is found to not have positions, we call a method to convert
       the map to use keys instead of positions: fallbackToKeyBasedBuffer().
       This conversion is not completely effective because delete records may
       not have key stored. We set a flag "needToDoHybridStrategy" to true and
       then handle this issue when merging with the base file.
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/OverwriteWithLatestSparkMerger.java       |  46 +++++
 .../hudi/BaseSparkInternalRowReaderContext.java    |   3 +
 .../hudi/common/model/HoodieAvroIndexedRecord.java |   8 +
 .../hudi/common/model/HoodieRecordMerger.java      |  17 ++
 .../common/model/OverwriteWithLatestMerger.java    |  49 +++++
 .../hudi/common/table/HoodieTableConfig.java       |  11 ++
 .../hudi/common/table/HoodieTableMetaClient.java   |  14 +-
 .../read/HoodieBaseFileGroupRecordBuffer.java      | 103 +++--------
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  |  14 +-
 .../HoodiePositionBasedFileGroupRecordBuffer.java  | 198 +++++++++++++++++----
 .../common/table/read/CustomPayloadForTesting.java |  35 ++++
 .../table/read/TestHoodieFileGroupReaderBase.java  |  30 +++-
 .../testutils/reader/DataGenerationPlan.java       |  18 +-
 .../testutils/reader/HoodieFileSliceTestUtils.java |  47 +++--
 .../hudi/common/table/read/TestCustomMerger.java   |  56 +++++-
 .../common/table/read/TestEventTimeMerging.java    |  59 +++++-
 ...ing.java => TestOverwriteWithLatestMerger.java} | 101 +++++++----
 .../reader/HoodieFileGroupReaderTestHarness.java   |   9 +-
 .../org/apache/hudi/HoodieCreateRecordUtils.scala  |   8 +-
 ...stHoodiePositionBasedFileGroupRecordBuffer.java |  34 ++--
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  17 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |   6 +-
 22 files changed, 665 insertions(+), 218 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java
new file mode 100644
index 00000000000..611f045f645
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+/**
+ * Spark merger that always chooses the newer record
+ */
+public class OverwriteWithLatestSparkMerger extends HoodieSparkRecordMerger {
+
+  @Override
+  public String getMergingStrategy() {
+    return OVERWRITE_MERGER_STRATEGY_UUID;
+  }
+
+  @Override
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+    return Option.of(Pair.of(newer, newSchema));
+  }
+
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 7fb4577f896..36bcba9214c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -48,6 +48,7 @@ import scala.Function1;
 
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
 import static org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;
 
 /**
@@ -65,6 +66,8 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
     switch (mergerStrategy) {
       case DEFAULT_MERGER_STRATEGY_UUID:
         return new HoodieSparkRecordMerger();
+      case OVERWRITE_MERGER_STRATEGY_UUID:
+        return new OverwriteWithLatestSparkMerger();
       default:
         throw new HoodieException("The merger strategy UUID is not supported: 
" + mergerStrategy);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 77f724249c7..96f6700ef91 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -53,6 +53,14 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
     super(key, data);
   }
 
+  public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, 
HoodieRecordLocation currentLocation) {
+    super(key, data, null, currentLocation, null);
+  }
+
+  public HoodieAvroIndexedRecord(IndexedRecord data, HoodieRecordLocation 
currentLocation) {
+    super(null, data, null, currentLocation, null);
+  }
+
   public HoodieAvroIndexedRecord(
       HoodieKey key,
       IndexedRecord data,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 62cde38a351..94d6509a7d8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.model;
 
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -45,6 +46,8 @@ public interface HoodieRecordMerger extends Serializable {
 
   String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
 
+  String OVERWRITE_MERGER_STRATEGY_UUID = 
"ce9acb64-bde0-424c-9b91-f6ebba25356d";
+
   /**
    * This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload.
    * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
@@ -163,4 +166,18 @@ public interface HoodieRecordMerger extends Serializable {
    * The kind of merging strategy this recordMerger belongs to. An UUID 
represents merging strategy.
    */
   String getMergingStrategy();
+
+  /**
+   * The record merge mode that corresponds to this record merger
+   */
+  default RecordMergeMode getRecordMergeMode() {
+    switch (getMergingStrategy()) {
+      case DEFAULT_MERGER_STRATEGY_UUID:
+        return RecordMergeMode.EVENT_TIME_ORDERING;
+      case OVERWRITE_MERGER_STRATEGY_UUID:
+        return RecordMergeMode.OVERWRITE_WITH_LATEST;
+      default:
+        return RecordMergeMode.CUSTOM;
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
new file mode 100644
index 00000000000..2311b030a18
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+/**
+ * Avro Merger that always chooses the newer record
+ */
+public class OverwriteWithLatestMerger implements HoodieRecordMerger {
+
+  @Override
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+    return Option.of(Pair.of(newer, newSchema));
+  }
+
+  @Override
+  public HoodieRecord.HoodieRecordType getRecordType() {
+    return HoodieRecord.HoodieRecordType.AVRO;
+  }
+
+  @Override
+  public String getMergingStrategy() {
+    return OVERWRITE_MERGER_STRATEGY_UUID;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b3bf9668d93..a295f14108d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -198,6 +198,17 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = 
ConfigProperty
       .key("hoodie.compaction.record.merger.strategy")
       .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+      .withInferFunction(cfg -> {
+        switch 
(RecordMergeMode.valueOf(cfg.getStringOrDefault(RECORD_MERGE_MODE))) {
+          case EVENT_TIME_ORDERING:
+            return Option.of(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
+          case OVERWRITE_WITH_LATEST:
+            return 
Option.of(HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID);
+          case CUSTOM:
+          default:
+            return Option.empty();
+        }
+      })
       .sinceVersion("0.13.0")
       .withDocumentation("Id of merger strategy. Hudi will pick 
HoodieRecordMerger implementations in 
hoodie.datasource.write.record.merger.impls which has the same merger strategy 
id");
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 250091ecec6..fdf843f078f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -79,6 +79,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
 import static org.apache.hudi.common.table.HoodieTableConfig.INITIAL_VERSION;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
@@ -1307,6 +1308,9 @@ public class HoodieTableMetaClient implements 
Serializable {
         if (recordMergeMode != null) {
           tableConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name());
         }
+        if (recordMergerStrategy != null) {
+          tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, 
recordMergerStrategy);
+        }
       }
 
       if (null != tableCreateSchema) {
@@ -1415,20 +1419,25 @@ public class HoodieTableMetaClient implements 
Serializable {
         boolean recordMergerStrategySet = null != recordMergerStrategy;
 
         if (!recordMergerStrategySet
-            || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) {
+            || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)
+            || recordMergerStrategy.equals(OVERWRITE_MERGER_STRATEGY_UUID)) {
           if (payloadClassNameSet) {
             if 
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
               recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+              recordMergerStrategy = OVERWRITE_MERGER_STRATEGY_UUID;
             } else if 
(payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) {
               recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+              recordMergerStrategy = DEFAULT_MERGER_STRATEGY_UUID;
             } else {
               recordMergeMode = RecordMergeMode.CUSTOM;
             }
           } else if (payloadTypeSet) {
             if 
(payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())) {
               recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+              recordMergerStrategy = OVERWRITE_MERGER_STRATEGY_UUID;
             } else if 
(payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())) {
               recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+              recordMergerStrategy = DEFAULT_MERGER_STRATEGY_UUID;
             } else {
               recordMergeMode = RecordMergeMode.CUSTOM;
             }
@@ -1458,6 +1467,9 @@ public class HoodieTableMetaClient implements 
Serializable {
                   || (payloadClassNameSet && 
payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName()))
                   || (payloadTypeSet && 
payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())),
               constructMergeConfigErrorMessage());
+          checkArgument(recordMergerStrategySet && 
recordMergerStrategy.equals(OVERWRITE_MERGER_STRATEGY_UUID),
+              "Record merger strategy (" + (recordMergerStrategySet ? 
recordMergerStrategy : "null")
+                  + ") should be consistent with the record merging mode 
OVERWRITE_WITH_LATEST");
           break;
         case EVENT_TIME_ORDERING:
           checkArgument((!payloadClassNameSet && !payloadTypeSet)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index c25fba0e928..9e0763d6116 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.InternalSchemaCache;
@@ -39,26 +38,19 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieKeyException;
-import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 
 import org.apache.avro.Schema;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Function;
 
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
@@ -97,6 +89,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     this.partitionPathFieldOpt = partitionPathFieldOpt;
     this.recordMergeMode = getRecordMergeMode(payloadProps);
     this.recordMerger = recordMerger;
+    //Custom merge mode should produce the same results for any merger so we 
won't fail if there is a mismatch
+    if (recordMerger.getRecordMergeMode() != this.recordMergeMode && 
this.recordMergeMode != RecordMergeMode.CUSTOM) {
+      throw new IllegalStateException("Record merger is " + 
recordMerger.getClass().getName() + " but merge mode is " + 
this.recordMergeMode);
+    }
     this.payloadProps = payloadProps;
     this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
     this.hoodieTableMetaClient = hoodieTableMetaClient;
@@ -293,21 +289,28 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord 
deleteRecord,
                                                             Pair<Option<T>, 
Map<String, Object>> existingRecordMetadataPair) {
     if (existingRecordMetadataPair != null) {
-      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
-      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
-      // For same ordering values, uses the natural order(arrival time 
semantics).
-      Comparable existingOrderingVal = readerContext.getOrderingValue(
-          existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema,
-          payloadProps);
-      Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
-      // Checks the ordering value does not equal to 0
-      // because we use 0 as the default value which means natural order
-      boolean chooseExisting = !deleteOrderingVal.equals(0)
-          && ReflectionUtils.isSameClass(existingOrderingVal, 
deleteOrderingVal)
-          && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
-      if (chooseExisting) {
-        // The DELETE message is obsolete if the old message has greater 
orderingVal.
-        return Option.empty();
+      switch (recordMergeMode) {
+        case OVERWRITE_WITH_LATEST:
+          return Option.empty();
+        case EVENT_TIME_ORDERING:
+        case CUSTOM:
+        default:
+          Comparable existingOrderingVal = readerContext.getOrderingValue(
+              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema,
+              payloadProps);
+          if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingVal)) {
+            return Option.empty();
+          }
+          Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+          // Checks the ordering value does not equal to 0
+          // because we use 0 as the default value which means natural order
+          boolean chooseExisting = !deleteOrderingVal.equals(0)
+              && ReflectionUtils.isSameClass(existingOrderingVal, 
deleteOrderingVal)
+              && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
+          if (chooseExisting) {
+            // The DELETE message is obsolete if the old message has greater 
orderingVal.
+            return Option.empty();
+          }
       }
     }
     // Do delete.
@@ -428,60 +431,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     }
   }
 
-  /**
-   * Filter a record for downstream processing when:
-   * 1. A set of pre-specified keys exists.
-   * 2. The key of the record is not contained in the set.
-   */
-  protected boolean shouldSkip(T record, String keyFieldName, boolean 
isFullKey, Set<String> keys, Schema writerSchema) {
-    String recordKey = readerContext.getValue(record, writerSchema, 
keyFieldName).toString();
-    // Can not extract the record key, throw.
-    if (recordKey == null || recordKey.isEmpty()) {
-      throw new HoodieKeyException("Can not extract the key for a record");
-    }
-
-    // No keys are specified. Cannot skip at all.
-    if (keys.isEmpty()) {
-      return false;
-    }
-
-    // When the record key matches with one of the keys or key prefixes, can 
not skip.
-    if ((isFullKey && keys.contains(recordKey))
-        || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
-      return false;
-    }
-
-    // Otherwise, this record is not needed.
-    return true;
-  }
-
-  /**
-   * Extract the record positions from a log block header.
-   *
-   * @param logBlock
-   * @return
-   * @throws IOException
-   */
-  protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock) 
throws IOException {
-    List<Long> blockPositions = new ArrayList<>();
-
-    Roaring64NavigableMap positions = logBlock.getRecordPositions();
-    if (positions == null || positions.isEmpty()) {
-      throw new HoodieValidationException("No record position info is found 
when attempt to do position based merge.");
-    }
-
-    Iterator<Long> iterator = positions.iterator();
-    while (iterator.hasNext()) {
-      blockPositions.add(iterator.next());
-    }
-
-    if (blockPositions.isEmpty()) {
-      throw new HoodieCorruptedDataException("No positions are extracted.");
-    }
-
-    return blockPositions;
-  }
-
   protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>, 
Map<String, Object>> logRecordInfo) throws IOException {
     Map<String, Object> metadata = readerContext.generateMetadataForRecord(
         baseRecord, readerSchema);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 96d4306afd4..d311923c625 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -102,7 +102,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
   }
 
   @Override
-  public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
+  public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws 
IOException {
     Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
     while (it.hasNext()) {
       DeleteRecord record = it.next();
@@ -126,17 +126,19 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileGroupR
     return records.containsKey(recordKey);
   }
 
+  protected boolean hasNextBaseRecord(T baseRecord) throws IOException {
+    String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
+    Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(recordKey);
+    return hasNextBaseRecord(baseRecord, logRecordInfo);
+  }
+
   @Override
   protected boolean doHasNext() throws IOException {
     ValidationUtils.checkState(baseFileIterator != null, "Base file iterator 
has not been set yet");
 
     // Handle merging.
     while (baseFileIterator.hasNext()) {
-      T baseRecord = baseFileIterator.next();
-
-      String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
-      Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(recordKey);
-      if (hasNextBaseRecord(baseRecord, logRecordInfo)) {
+      if (hasNextBaseRecord(baseFileIterator.next())) {
         return true;
       }
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index bc25bb96f5e..79011a81d6b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -27,17 +27,23 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieKeyException;
 
 import org.apache.avro.Schema;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -45,17 +51,21 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 
-import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+
 /**
  * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
  * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} 
methods into record position based map.
  * Here the position means that record position in the base file. The records 
from the base file is accessed from an iterator object. These records are 
merged when the
  * {@link #hasNext} method is called.
  */
-public class HoodiePositionBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
+public class HoodiePositionBasedFileGroupRecordBuffer<T> extends 
HoodieKeyBasedFileGroupRecordBuffer<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodiePositionBasedFileGroupRecordBuffer.class);
+
   private static final String ROW_INDEX_COLUMN_NAME = "row_index";
   public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = 
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
   private long nextRecordPosition = 0L;
+  private boolean needToDoHybridStrategy = false;
 
   public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
                                                   HoodieTableMetaClient 
hoodieTableMetaClient,
@@ -73,11 +83,23 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
 
   @Override
   public BufferType getBufferType() {
-    return BufferType.POSITION_BASED_MERGE;
+    return readerContext.getShouldMergeUseRecordPosition() ? 
BufferType.POSITION_BASED_MERGE : super.getBufferType();
   }
 
   @Override
   public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws IOException {
+    if (!readerContext.getShouldMergeUseRecordPosition()) {
+      super.processDataBlock(dataBlock, keySpecOpt);
+      return;
+    }
+    // Extract positions from data block.
+    List<Long> recordPositions = extractRecordPositions(dataBlock);
+    if (recordPositions == null) {
+      LOG.warn("Falling back to key based merge for Read");
+      fallbackToKeyBasedBuffer();
+      super.processDataBlock(dataBlock, keySpecOpt);
+      return;
+    }
     // Prepare key filters.
     Set<String> keys = new HashSet<>();
     boolean isFullKey = true;
@@ -94,8 +116,6 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
       enablePartialMerging = true;
     }
 
-    // Extract positions from data block.
-    List<Long> recordPositions = extractRecordPositions(dataBlock);
     Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
 
     // TODO: Return an iterator that can generate sequence number with the 
record.
@@ -123,35 +143,58 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
     }
   }
 
-  @Override
-  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable recordPosition) throws IOException {
-    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
-    Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
-        doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
-    if (mergedRecordAndMetadata.isPresent()) {
-      records.put(recordPosition, Pair.of(
-          
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
-          mergedRecordAndMetadata.get().getRight()));
+  private void fallbackToKeyBasedBuffer() {
+    readerContext.setShouldMergeUseRecordPosition(false);
+    //need to make a copy of the keys to avoid concurrent modification 
exception
+    ArrayList<Serializable> positions = new ArrayList<>(records.keySet());
+    for (Serializable position : positions) {
+      Pair<Option<T>, Map<String, Object>> entry = records.get(position);
+      Object recordKey = entry.getRight().get(INTERNAL_META_RECORD_KEY);
+      if (entry.getLeft().isPresent() || recordKey != null) {
+
+        records.put((String) recordKey, entry);
+        records.remove(position);
+      } else {
+        //if it's a delete record and the key is null, then we need to still 
use positions
+        //this happens when we read the positions using 
logBlock.getRecordPositions()
+        //instead of reading the delete records themselves
+        needToDoHybridStrategy = true;
+      }
     }
   }
 
   @Override
   public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws 
IOException {
+    if (!readerContext.getShouldMergeUseRecordPosition()) {
+      super.processDeleteBlock(deleteBlock);
+      return;
+    }
+
     List<Long> recordPositions = extractRecordPositions(deleteBlock);
-    if 
(recordMerger.getMergingStrategy().equals(DEFAULT_MERGER_STRATEGY_UUID)) {
-      for (Long recordPosition : recordPositions) {
-        records.put(recordPosition,
-            Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(null, "", 0)));
-      }
+    if (recordPositions == null) {
+      LOG.warn("Falling back to key based merge for Read");
+      fallbackToKeyBasedBuffer();
+      super.processDeleteBlock(deleteBlock);
       return;
     }
 
-    int recordIndex = 0;
-    Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
-    while (it.hasNext()) {
-      DeleteRecord record = it.next();
-      long recordPosition = recordPositions.get(recordIndex++);
-      processNextDeletedRecord(record, recordPosition);
+    switch (recordMergeMode) {
+      case OVERWRITE_WITH_LATEST:
+        for (Long recordPosition : recordPositions) {
+          records.putIfAbsent(recordPosition,
+              Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(null, "", 0L)));
+        }
+        return;
+      case EVENT_TIME_ORDERING:
+      case CUSTOM:
+      default:
+        int recordIndex = 0;
+        Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+        while (it.hasNext()) {
+          DeleteRecord record = it.next();
+          long recordPosition = recordPositions.get(recordIndex++);
+          processNextDeletedRecord(record, recordPosition);
+        }
     }
   }
 
@@ -174,20 +217,97 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
   }
 
   @Override
-  protected boolean doHasNext() throws IOException {
-    ValidationUtils.checkState(baseFileIterator != null, "Base file iterator 
has not been set yet");
-
-    // Handle merging.
-    while (baseFileIterator.hasNext()) {
-      T baseRecord = baseFileIterator.next();
-      nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
-      Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
-      if (hasNextBaseRecord(baseRecord, logRecordInfo)) {
-        return true;
+  protected boolean hasNextBaseRecord(T baseRecord) throws IOException {
+    if (!readerContext.getShouldMergeUseRecordPosition()) {
+      return doHasNextFallbackBaseRecord(baseRecord);
+    }
+
+    nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema,
+        ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
+    Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
+
+    Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+        baseRecord, readerSchema);
+
+    Option<T> resultRecord = logRecordInfo != null
+        ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight())
+        : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), 
metadata);
+    if (resultRecord.isPresent()) {
+      nextRecord = readerContext.seal(resultRecord.get());
+      return true;
+    }
+    return false;
+  }
+
+  private boolean doHasNextFallbackBaseRecord(T baseRecord) throws IOException 
{
+    if (needToDoHybridStrategy) {
+      //see if there is a delete block with record positions
+      nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema,
+          ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
+      Pair<Option<T>, Map<String, Object>> logRecordInfo  = 
records.remove(nextRecordPosition++);
+      if (logRecordInfo != null) {
+        //we have a delete that was not able to be converted. Since it is the 
newest version, the record is deleted
+        //remove a key based record if it exists
+        records.remove(readerContext.getRecordKey(baseRecord, readerSchema));
+        return false;
       }
     }
+    return super.hasNextBaseRecord(baseRecord);
+  }
+
+  /**
+   * Filter a record for downstream processing when:
+   * 1. A set of pre-specified keys exists.
+   * 2. The key of the record is not contained in the set.
+   */
+  protected boolean shouldSkip(T record, String keyFieldName, boolean 
isFullKey, Set<String> keys, Schema writerSchema) {
+    // No keys are specified. Cannot skip at all.
+    if (keys.isEmpty()) {
+      return false;
+    }
+
+    String recordKey = readerContext.getValue(record, writerSchema, 
keyFieldName).toString();
+    // Can not extract the record key, throw.
+    if (recordKey == null || recordKey.isEmpty()) {
+      throw new HoodieKeyException("Can not extract the key for a record");
+    }
+
+    // When the record key matches with one of the keys or key prefixes, can 
not skip.
+    if ((isFullKey && keys.contains(recordKey))
+        || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
+      return false;
+    }
+
+    // Otherwise, this record is not needed.
+    return true;
+  }
+
+  /**
+   * Extract the record positions from a log block header.
+   *
+   * @param logBlock
+   * @return
+   * @throws IOException
+   */
+  protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock) 
throws IOException {
+    List<Long> blockPositions = new ArrayList<>();
+
+    Roaring64NavigableMap positions = logBlock.getRecordPositions();
+    if (positions == null || positions.isEmpty()) {
+      LOG.warn("No record position info is found when attempt to do position 
based merge.");
+      return null;
+    }
+
+    Iterator<Long> iterator = positions.iterator();
+    while (iterator.hasNext()) {
+      blockPositions.add(iterator.next());
+    }
+
+    if (blockPositions.isEmpty()) {
+      LOG.warn("No positions are extracted.");
+      return null;
+    }
 
-    // Handle records solely from log files.
-    return hasNextLogRecord();
+    return blockPositions;
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/CustomPayloadForTesting.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/CustomPayloadForTesting.java
new file mode 100644
index 00000000000..b69d4421f73
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/CustomPayloadForTesting.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.GenericRecord;
+
+public class CustomPayloadForTesting extends DefaultHoodieRecordPayload {
+  public CustomPayloadForTesting(GenericRecord record, Comparable orderingVal) 
{
+    super(record, orderingVal);
+  }
+
+  public CustomPayloadForTesting(Option<GenericRecord> record) {
+    super(record);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index a8717d8a8e3..8128a32d5ef 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -55,6 +56,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
 import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
@@ -75,12 +78,16 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   @TempDir
   protected java.nio.file.Path tempDir;
 
+  protected String customRecordMergerStrategy = 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
   public abstract StorageConfiguration<?> getStorageConf();
 
   public abstract String getBasePath();
 
   public abstract HoodieReaderContext<T> getHoodieReaderContext(String 
tablePath, Schema avroSchema, StorageConfiguration<?> storageConf);
 
+  public abstract String getRecordPayloadForMergeMode(RecordMergeMode 
mergeMode);
+
   public abstract void commitToTable(List<String> recordList, String operation,
                                      Map<String, String> writeConfigs);
 
@@ -93,7 +100,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
 
   @Test
   public void testCompareToComparable() throws Exception {
-    Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING));
     // Prepare a table for initializing reader context
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       commitToTable(recordsToStrings(dataGen.generateInserts("001", 1)), 
BULK_INSERT.value(), writeConfigs);
@@ -170,7 +177,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void testReadFileGroupInMergeOnReadTable(RecordMergeMode 
recordMergeMode, String logDataBlockFormat) throws Exception {
-    Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode));
     writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
 
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
@@ -194,7 +201,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode 
recordMergeMode, String logDataBlockFormat) throws Exception {
-    Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode));
     writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
     // Use InMemoryIndex to generate log only mor table
     writeConfigs.put("hoodie.index.type", "INMEMORY");
@@ -212,7 +219,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     }
   }
 
-  private Map<String, String> getCommonConfigs() {
+  private Map<String, String> getCommonConfigs(RecordMergeMode 
recordMergeMode) {
     Map<String, String> configMapping = new HashMap<>();
     configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
     configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
@@ -225,6 +232,21 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     configMapping.put("hoodie.delete.shuffle.parallelism", "1");
     configMapping.put("hoodie.merge.small.file.group.candidates.limit", "0");
     configMapping.put("hoodie.compact.inline", "false");
+    configMapping.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+    configMapping.put("hoodie.datasource.write.payload.class", 
getRecordPayloadForMergeMode(recordMergeMode));
+    switch (recordMergeMode) {
+      case OVERWRITE_WITH_LATEST:
+        configMapping.put("hoodie.datasource.write.record.merger.strategy", 
OVERWRITE_MERGER_STRATEGY_UUID);
+        configMapping.put("hoodie.datasource.write.precombine.field", "");
+        break;
+      case CUSTOM:
+        configMapping.put("hoodie.datasource.write.record.merger.strategy", 
customRecordMergerStrategy);
+        break;
+      case EVENT_TIME_ORDERING:
+      default:
+        configMapping.put("hoodie.datasource.write.record.merger.strategy", 
DEFAULT_MERGER_STRATEGY_UUID);
+        break;
+    }
     return configMapping;
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
index 29b0090a5db..666a2ff11ce 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
@@ -39,6 +39,7 @@ public class DataGenerationPlan {
   // The operation type of the record.
   private final OperationType operationType;
   private final String instantTime;
+  private final boolean writePositions;
 
   public enum OperationType {
     INSERT,
@@ -50,12 +51,14 @@ public class DataGenerationPlan {
                             String partitionPath,
                             long timestamp,
                             OperationType operationType,
-                            String instantTime) {
+                            String instantTime,
+                            boolean writePositions) {
     this.recordKeys = recordKeys;
     this.partitionPath = partitionPath;
     this.timestamp = timestamp;
     this.operationType = operationType;
     this.instantTime = instantTime;
+    this.writePositions = writePositions;
   }
 
   public List<String> getRecordKeys() {
@@ -78,6 +81,10 @@ public class DataGenerationPlan {
     return instantTime;
   }
 
+  public boolean getWritePositions() {
+    return writePositions;
+  }
+
   public static Builder newBuilder() {
     return new Builder();
   }
@@ -88,6 +95,7 @@ public class DataGenerationPlan {
     private long timestamp;
     private OperationType operationType;
     private String instantTime;
+    private boolean writePositions;
 
     public Builder withRecordKeys(List<String> recordKeys) {
       this.recordKeys = recordKeys;
@@ -114,13 +122,19 @@ public class DataGenerationPlan {
       return this;
     }
 
+    public Builder withWritePositions(boolean writePositions) {
+      this.writePositions = writePositions;
+      return this;
+    }
+
     public DataGenerationPlan build() {
       return new DataGenerationPlan(
           recordKeys,
           partitionPath,
           timestamp,
           operationType,
-          instantTime);
+          instantTime,
+          writePositions);
     }
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 2ca33678adc..01ce1f168f6 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -85,6 +86,7 @@ public class HoodieFileSliceTestUtils {
   public static final String PARTITION_PATH = "partition_path";
   public static final String RIDER = "rider";
   public static final String ROW_KEY = "_row_key";
+  public static final int RECORD_KEY_INDEX = 
AVRO_SCHEMA.getField(ROW_KEY).pos();
   public static final String TIMESTAMP = "timestamp";
   public static final HoodieTestDataGenerator DATA_GEN =
       new HoodieTestDataGenerator(0xDEED);
@@ -165,21 +167,25 @@ public class HoodieFileSliceTestUtils {
       HoodieLogBlock.HoodieLogBlockType dataBlockType,
       List<IndexedRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
-      StoragePath logFilePath
+      StoragePath logFilePath,
+      boolean writePositions,
+      Map<String, Long> keyToPositionMap
   ) {
     return createDataBlock(
         dataBlockType,
-        records.stream().map(HoodieAvroIndexedRecord::new)
+        records.stream().map(r -> new HoodieAvroIndexedRecord(r, new 
HoodieRecordLocation("", "", keyToPositionMap.get(r.get(RECORD_KEY_INDEX)))))
             .collect(Collectors.toList()),
         header,
-        logFilePath);
+        logFilePath,
+        writePositions);
   }
 
   private static HoodieDataBlock createDataBlock(
       HoodieLogBlock.HoodieLogBlockType dataBlockType,
       List<HoodieRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
-      StoragePath pathForReader
+      StoragePath pathForReader,
+      boolean writePositions
   ) {
     switch (dataBlockType) {
       case CDC_DATA_BLOCK:
@@ -190,7 +196,7 @@ public class HoodieFileSliceTestUtils {
       case AVRO_DATA_BLOCK:
         return new HoodieAvroDataBlock(
             records,
-            false,
+            writePositions,
             header,
             HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case HFILE_DATA_BLOCK:
@@ -203,7 +209,7 @@ public class HoodieFileSliceTestUtils {
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(
             records,
-            false,
+            writePositions,
             header,
             HoodieRecord.RECORD_KEY_METADATA_FIELD,
             PARQUET_COMPRESSION_CODEC_NAME.defaultValue(),
@@ -219,21 +225,23 @@ public class HoodieFileSliceTestUtils {
       List<IndexedRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
       Schema schema,
-      Properties props
+      Properties props,
+      boolean writePositions,
+      Map<String, Long> keyToPositionMap
   ) {
     List<HoodieRecord> hoodieRecords = records.stream()
         .map(r -> {
           String rowKey = (String) 
r.get(r.getSchema().getField(ROW_KEY).pos());
           String partitionPath = (String) 
r.get(r.getSchema().getField(PARTITION_PATH).pos());
-          return new HoodieAvroIndexedRecord(new HoodieKey(rowKey, 
partitionPath), r);
+          return new HoodieAvroIndexedRecord(new HoodieKey(rowKey, 
partitionPath), r, new HoodieRecordLocation("", "",  
keyToPositionMap.get(r.get(RECORD_KEY_INDEX))));
         })
         .collect(Collectors.toList());
     return new HoodieDeleteBlock(
         hoodieRecords.stream().map(
             r -> Pair.of(DeleteRecord.create(
-                r.getKey(), r.getOrderingValue(schema, props)), -1L))
+                r.getKey(), r.getOrderingValue(schema, props)), 
r.getCurrentLocation().getPosition()))
             .collect(Collectors.toList()),
-        false,
+        writePositions,
         header
     );
   }
@@ -285,7 +293,9 @@ public class HoodieFileSliceTestUtils {
       String fileId,
       String logInstantTime,
       int version,
-      HoodieLogBlock.HoodieLogBlockType blockType
+      HoodieLogBlock.HoodieLogBlockType blockType,
+      boolean writePositions,
+      Map<String, Long> keyToPositionMap
   ) throws InterruptedException, IOException {
     try (HoodieLogFormat.Writer writer =
              HoodieLogFormat.newWriterBuilder()
@@ -301,11 +311,11 @@ public class HoodieFileSliceTestUtils {
 
       if (blockType != DELETE_BLOCK) {
         HoodieDataBlock dataBlock = getDataBlock(
-            blockType, records, header, new StoragePath(logFilePath));
+            blockType, records, header, new StoragePath(logFilePath), 
writePositions, keyToPositionMap);
         writer.appendBlock(dataBlock);
       } else {
         HoodieDeleteBlock deleteBlock = getDeleteBlock(
-            records, header, schema, PROPERTIES);
+            records, header, schema, PROPERTIES, writePositions, 
keyToPositionMap);
         writer.appendBlock(deleteBlock);
       }
     }
@@ -328,6 +338,7 @@ public class HoodieFileSliceTestUtils {
     HoodieBaseFile baseFile = null;
     List<HoodieLogFile> logFiles = new ArrayList<>();
 
+    Map<String, Long> keyToPositionMap = new HashMap<>();
     // Generate a base file with records.
     DataGenerationPlan baseFilePlan = plans.get(0);
     if (!baseFilePlan.getRecordKeys().isEmpty()) {
@@ -339,6 +350,9 @@ public class HoodieFileSliceTestUtils {
           records,
           schema,
           baseFilePlan.getInstantTime());
+      for (int i = 0; i < baseFilePlan.getRecordKeys().size(); i++) {
+        keyToPositionMap.put(baseFilePlan.getRecordKeys().get(i), (long) i);
+      }
     }
 
     // Rest of plans are for log files.
@@ -361,7 +375,9 @@ public class HoodieFileSliceTestUtils {
           fileId,
           logFilePlan.getInstantTime(),
           i,
-          blockType));
+          blockType,
+          logFilePlan.getWritePositions(),
+          keyToPositionMap));
     }
 
     // Assemble the FileSlice finally.
@@ -391,6 +407,7 @@ public class HoodieFileSliceTestUtils {
         .withPartitionPath(partitionPath)
         .withTimeStamp(timestamp)
         .withInstantTime(baseInstantTime)
+        .withWritePositions(false)
         .build();
     plans.add(baseFilePlan);
 
@@ -412,6 +429,7 @@ public class HoodieFileSliceTestUtils {
       List<Long> timestamps,
       List<DataGenerationPlan.OperationType> operationTypes,
       List<String> instantTimes,
+      List<Boolean> shouldWritePositions,
       String basePath,
       String partitionPath,
       String fileId
@@ -425,6 +443,7 @@ public class HoodieFileSliceTestUtils {
           .withRecordKeys(keys)
           .withTimeStamp(timestamps.get(i))
           .withInstantTime(instantTimes.get(i))
+          .withWritePositions(shouldWritePositions.get(i))
           .build());
     }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
index 4ec1c0556b0..c6f949f56a7 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -39,11 +39,16 @@ import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
@@ -103,6 +108,7 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
         INSERT, DELETE, UPDATE, DELETE, UPDATE);
     instantTimes = Arrays.asList(
         "001", "002", "003", "004", "005");
+    shouldWritePositions = Arrays.asList(false, false, false, false, false);
   }
 
   @BeforeEach
@@ -115,9 +121,11 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
     setUpMockCommits();
   }
 
-  @Test
-  public void testWithOneLogFile() throws IOException, InterruptedException {
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithOneLogFile(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2, 
useRecordPositions);
     List<String> leftKeysExpected =
         Arrays.asList("6", "7", "8", "9", "10");
     List<String> leftKeysActual = new ArrayList<>();
@@ -129,9 +137,11 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftKeysExpected, leftKeysActual);
   }
 
-  @Test
-  public void testWithTwoLogFiles() throws IOException, InterruptedException {
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithTwoLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3, 
useRecordPositions);
     List<String> leftKeysExpected =
         Arrays.asList("1", "3", "6", "7", "8", "9", "10");
     List<String> leftKeysActual = new ArrayList<>();
@@ -143,9 +153,11 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftKeysExpected, leftKeysActual);
   }
 
-  @Test
-  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithThreeLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions, useRecordPositions);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4, 
useRecordPositions);
     List<String> leftKeysExpected =
         Arrays.asList("1", "3", "7", "9", "10");
     List<String> leftKeysActual = new ArrayList<>();
@@ -171,6 +183,32 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftKeysExpected, leftKeysActual);
   }
 
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void testPositionMergeFallback(boolean log1haspositions, boolean 
log2haspositions,
+                                        boolean log3haspositions, boolean 
log4haspositions) throws IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(true, log1haspositions, 
log2haspositions, log3haspositions, log4haspositions);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5, true);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "5", "7", "9");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  //generate all possible combos of 4 booleans
+  private static Stream<Arguments> testArgs() {
+    Stream.Builder<Arguments> b = Stream.builder();
+    for (int i = 0; i < 16; i++) {
+      b.add(Arguments.of(i % 2 == 0, (i / 2) % 2 == 0,  (i / 4) % 2 == 0, (i / 
8) % 2 == 0));
+    }
+    return b.build();
+  }
+
   /**
    * This merger is designed to save records whose record key is odd.
    * That means, if the record is not a delete record, and its record
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
index 3b3fc3c4359..237bf527be0 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -35,11 +35,16 @@ import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
 import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
@@ -99,6 +104,7 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     // Specify the instant time for each file.
     instantTimes = Arrays.asList(
         "001", "002", "003", "004", "005");
+    shouldWritePositions = Arrays.asList(false, false, false, false, false);
   }
 
   @BeforeEach
@@ -111,10 +117,12 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     setUpMockCommits();
   }
 
-  @Test
-  public void testWithOneLogFile() throws IOException, InterruptedException {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithOneLogFile(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions);
     // The FileSlice contains a base file and a log file.
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2, 
useRecordPositions);
     List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
     List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
     List<String> leftKeysActual = new ArrayList<>();
@@ -128,10 +136,12 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftTimestampsExpected, leftTimestampsActual);
   }
 
-  @Test
-  public void testWithTwoLogFiles() throws IOException, InterruptedException {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithTwoLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions);
     // The FileSlice contains a base file and two log files.
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3, 
useRecordPositions);
     List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
     List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
     List<String> leftKeysActual = new ArrayList<>();
@@ -145,10 +155,12 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftTimestampsExpected, leftTimestampsActual);
   }
 
-  @Test
-  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithThreeLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions, useRecordPositions);
     // The FileSlice contains a base file and three log files.
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4, 
useRecordPositions);
     List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
     List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
     List<String> leftKeysActual = new ArrayList<>();
@@ -178,4 +190,33 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftKeysExpected, leftKeysActual);
     assertEquals(leftTimestampsExpected, leftTimestampsActual);
   }
+
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void testPositionMergeFallback(boolean log1haspositions, boolean 
log2haspositions,
+                                        boolean log3haspositions, boolean 
log4haspositions) throws IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(true, log1haspositions, 
log2haspositions, log3haspositions, log4haspositions);
+    // The FileSlice contains a base file and three log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5, true);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "6", "7", "8", 
"9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(4L, 4L, 2L, 2L, 2L, 2L, 
2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  //generate all possible combos of 4 booleans
+  private static Stream<Arguments> testArgs() {
+    Stream.Builder<Arguments> b = Stream.builder();
+    for (int i = 0; i < 16; i++) {
+      b.add(Arguments.of(i % 2 == 0, (i / 2) % 2 == 0,  (i / 4) % 2 == 0, (i / 
8) % 2 == 0));
+    }
+    return b.build();
+  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
similarity index 63%
copy from 
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
copy to 
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
index 3b3fc3c4359..66b5f11460a 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
@@ -21,12 +21,12 @@ package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestMerger;
 import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
 import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
 import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
-import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
 import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -35,11 +35,16 @@ import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
 import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
@@ -48,17 +53,15 @@ import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.Operati
 import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class TestEventTimeMerging extends HoodieFileGroupReaderTestHarness {
+public class TestOverwriteWithLatestMerger extends 
HoodieFileGroupReaderTestHarness {
   @BeforeAll
   public static void setUp() throws IOException {
-    // Create dedicated merger to avoid current delete logic holes.
-    // TODO: Unify delete logic (HUDI-7240).
-    HoodieAvroRecordMerger merger = new HoodieAvroRecordTestMerger();
+    HoodieRecordMerger merger = new OverwriteWithLatestMerger();
     readerContext = new HoodieTestReaderContext(
         Option.of(merger),
-        Option.of(HoodieRecordTestPayload.class.getName()));
+        Option.of(OverwriteWithLatestAvroPayload.class.getName()));
     properties.setProperty(
-        HoodieCommonConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name());
+        HoodieCommonConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.OVERWRITE_WITH_LATEST.name());
 
     // -------------------------------------------------------------
     // The test logic is as follows:
@@ -71,16 +74,14 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     //    Current existing keys: [6, 7, 8, 9, 10]
     // 3. After adding the second log file,
     //    we tried to add the records with keys from 1 to 3 back,
-    //    but we cannot since their ordering value is 1 < 3.
-    //    Current existing keys: [6, 7, 8, 9, 10]
+    //    Current existing keys: [1, 2, 3, 6, 7, 8, 9, 10]
     // 4. After adding the third log file,
     //    we tried to delete records with keys from 6 to 8,
     //    but we cannot since their ordering value is 1 < 2.
-    //    Current existing keys: [6, 7, 8, 9, 10]
+    //    Current existing keys: [1, 2, 3, 9, 10]
     // 5. After adding the fourth log file,
-    //    we tried to add the records with keys from 1 to 2 back,
-    //    and it worked since their ordering value is 4 > 3.
-    //    Current existing keys: [1, 2, 6, 7, 8, 9, 10]
+    //    we tried to add the records with keys from 2 to 4
+    //    Current existing keys: [1, 2, 3, 4, 9, 10]
     // -------------------------------------------------------------
 
     // Specify the key column values for each file.
@@ -89,7 +90,7 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
         new HoodieFileSliceTestUtils.KeyRange(1, 5),
         new HoodieFileSliceTestUtils.KeyRange(1, 3),
         new HoodieFileSliceTestUtils.KeyRange(6, 8),
-        new HoodieFileSliceTestUtils.KeyRange(1, 2));
+        new HoodieFileSliceTestUtils.KeyRange(2, 4));
     // Specify the value of `timestamp` column for each file.
     timestamps = Arrays.asList(
         2L, 3L, 1L, 1L, 4L);
@@ -99,11 +100,12 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     // Specify the instant time for each file.
     instantTimes = Arrays.asList(
         "001", "002", "003", "004", "005");
+    shouldWritePositions = Arrays.asList(false, false, false, false, false);
   }
 
   @BeforeEach
   public void initialize() throws Exception {
-    setTableName(TestEventTimeMerging.class.getName());
+    setTableName(TestOverwriteWithLatestMerger.class.getName());
     initPath(tableName);
     initMetaClient();
     initTestDataGenerator(new String[]{PARTITION_PATH});
@@ -111,10 +113,12 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     setUpMockCommits();
   }
 
-  @Test
-  public void testWithOneLogFile() throws IOException, InterruptedException {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithOneLogFile(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions);
     // The FileSlice contains a base file and a log file.
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2, 
useRecordPositions);
     List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
     List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
     List<String> leftKeysActual = new ArrayList<>();
@@ -128,12 +132,14 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftTimestampsExpected, leftTimestampsActual);
   }
 
-  @Test
-  public void testWithTwoLogFiles() throws IOException, InterruptedException {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithTwoLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions);
     // The FileSlice contains a base file and two log files.
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
-    List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
-    List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3, 
useRecordPositions);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "6", "7", 
"8", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 1L, 1L, 2L, 2L, 2L, 
2L, 2L);
     List<String> leftKeysActual = new ArrayList<>();
     List<Long> leftTimestampsActual = new ArrayList<>();
     while (iterator.hasNext()) {
@@ -145,12 +151,14 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftTimestampsExpected, leftTimestampsActual);
   }
 
-  @Test
-  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithThreeLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions, useRecordPositions);
     // The FileSlice contains a base file and three log files.
-    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
-    List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
-    List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4, 
useRecordPositions);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 1L, 1L, 2L, 2L);
     List<String> leftKeysActual = new ArrayList<>();
     List<Long> leftTimestampsActual = new ArrayList<>();
     while (iterator.hasNext()) {
@@ -166,8 +174,28 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
   public void testWithFourLogFiles() throws IOException, InterruptedException {
     // The FileSlice contains a base file and three log files.
     ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
-    List<String> leftKeysExpected = Arrays.asList("1", "2", "6", "7", "8", 
"9", "10");
-    List<Long> leftTimestampsExpected = Arrays.asList(4L, 4L, 2L, 2L, 2L, 2L, 
2L);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "9", 
"10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 4L, 4L, 4L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void testPositionMergeFallback(boolean log1haspositions, boolean 
log2haspositions,
+                                        boolean log3haspositions, boolean 
log4haspositions) throws IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(true, log1haspositions, 
log2haspositions, log3haspositions, log4haspositions);
+    // The FileSlice contains a base file and three log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5, true);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "9", 
"10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 4L, 4L, 4L, 2L, 2L);
     List<String> leftKeysActual = new ArrayList<>();
     List<Long> leftTimestampsActual = new ArrayList<>();
     while (iterator.hasNext()) {
@@ -178,4 +206,13 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     assertEquals(leftKeysExpected, leftKeysActual);
     assertEquals(leftTimestampsExpected, leftTimestampsActual);
   }
+
+  //generate all possible combos of 4 booleans
+  private static Stream<Arguments> testArgs() {
+    Stream.Builder<Arguments> b = Stream.builder();
+    for (int i = 0; i < 16; i++) {
+      b.add(Arguments.of(i % 2 == 0, (i / 2) % 2 == 0,  (i / 4) % 2 == 0, (i / 
8) % 2 == 0));
+    }
+    return b.build();
+  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
index a2ef6ae3906..ac9fa2d3130 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -54,6 +54,7 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
   protected static List<DataGenerationPlan.OperationType> operationTypes;
   // Set the instantTime for each record set.
   protected static List<String> instantTimes;
+  protected static List<Boolean> shouldWritePositions;
 
   // Environmental variables.
   protected static StorageConfiguration<?> storageConf;
@@ -92,6 +93,11 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
 
   protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles)
       throws IOException, InterruptedException {
+    return getFileGroupIterator(numFiles, false);
+  }
+
+  protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles, 
boolean shouldReadPositions)
+      throws IOException, InterruptedException {
     assert (numFiles >= 1 && numFiles <= keyRanges.size());
 
     Option<FileSlice> fileSliceOpt =
@@ -101,6 +107,7 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
             timestamps.subList(0, numFiles),
             operationTypes.subList(0, numFiles),
             instantTimes.subList(0, numFiles),
+            shouldWritePositions.subList(0, numFiles),
             basePath,
             PARTITION_PATH,
             FILE_ID
@@ -114,7 +121,7 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
             basePath,
             "1000", // Not used internally.
             AVRO_SCHEMA,
-            false,
+            shouldReadPositions,
             0L,
             Long.MAX_VALUE,
             properties,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index c98a9a9c0f4..5fa0cc8d1ff 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -27,9 +27,9 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, 
SparkKeyGeneratorInterface}
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.util.StringUtils
 import org.apache.spark.TaskContext
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.rdd.RDD
@@ -123,6 +123,8 @@ object HoodieCreateRecordUtils {
           val consistentLogicalTimestampEnabled = parameters.getOrElse(
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
             
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
+          val precombine = config.getString(PRECOMBINE_FIELD)
+          val precombineEmpty = StringUtils.isNullOrEmpty(precombine)
 
           // handle dropping partition columns
           it.map { avroRec =>
@@ -140,8 +142,8 @@ object HoodieCreateRecordUtils {
               avroRecWithoutMeta
             }
 
-            val hoodieRecord = if (shouldCombine) {
-              val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, 
config.getString(PRECOMBINE_FIELD),
+            val hoodieRecord = if (shouldCombine && !precombineEmpty) {
+              val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, 
precombine,
                 false, 
consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
               DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, 
hoodieKey,
                 config.getString(PAYLOAD_CLASS_NAME), recordLocation)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index f61db4ee247..c4ff0e7b814 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.apache.avro.Schema;
@@ -66,8 +65,6 @@ import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrin
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodiePositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupReaderOnSpark {
   private final HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF);
@@ -76,12 +73,12 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
   private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
   private String partitionPath;
 
-  public void prepareBuffer(boolean useCustomMerger) throws Exception {
+  public void prepareBuffer(RecordMergeMode mergeMode) throws Exception {
     Map<String, String> writeConfigs = new HashMap<>();
     writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
"parquet");
     writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
     writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
-    writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
+    
writeConfigs.put("hoodie.datasource.write.precombine.field",mergeMode.equals(RecordMergeMode.OVERWRITE_WITH_LATEST)
 ? "" : "timestamp");
     writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
     writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
     writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
@@ -91,6 +88,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
     writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
     writeConfigs.put("hoodie.compact.inline", "false");
     writeConfigs.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
+    writeConfigs.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), 
getRecordPayloadForMergeMode(mergeMode));
     commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), 
INSERT.value(), writeConfigs);
 
     String[] partitionPaths = dataGen.getPartitionPaths();
@@ -111,11 +109,22 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
     ctx.setHasBootstrapBaseFile(false);
     ctx.setHasLogFiles(true);
     ctx.setNeedsBootstrapMerge(false);
-    ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new 
HoodieSparkRecordMerger());
+    switch (mergeMode) {
+      case CUSTOM:
+        ctx.setRecordMerger(new CustomMerger());
+        break;
+      case EVENT_TIME_ORDERING:
+        ctx.setRecordMerger(new HoodieSparkRecordMerger());
+        break;
+      case OVERWRITE_WITH_LATEST:
+      default:
+        ctx.setRecordMerger(new OverwriteWithLatestSparkMerger());
+        break;
+    }
     ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx, 
avroSchema, avroSchema,
         Option.empty(), metaClient.getTableConfig()));
     TypedProperties props = new TypedProperties();
-    props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.CUSTOM.name());
+    props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(), mergeMode.name());
     buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
         ctx,
         metaClient,
@@ -170,7 +179,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
 
   @Test
   public void testProcessDeleteBlockWithPositions() throws Exception {
-    prepareBuffer(false);
+    prepareBuffer(RecordMergeMode.OVERWRITE_WITH_LATEST);
     HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
     buffer.processDeleteBlock(deleteBlock);
     assertEquals(50, buffer.getLogRecords().size());
@@ -180,7 +189,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
 
   @Test
   public void testProcessDeleteBlockWithCustomMerger() throws Exception {
-    prepareBuffer(true);
+    prepareBuffer(RecordMergeMode.CUSTOM);
     HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
     buffer.processDeleteBlock(deleteBlock);
     assertEquals(50, buffer.getLogRecords().size());
@@ -189,11 +198,10 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
 
   @Test
   public void testProcessDeleteBlockWithoutPositions() throws Exception {
-    prepareBuffer(false);
+    prepareBuffer(RecordMergeMode.OVERWRITE_WITH_LATEST);
     HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions();
-    Exception exception = assertThrows(
-        HoodieValidationException.class, () -> 
buffer.processDeleteBlock(deleteBlock));
-    assertTrue(exception.getMessage().contains("No record position info is 
found"));
+    buffer.processDeleteBlock(deleteBlock);
+    assertEquals(50, buffer.getLogRecords().size());
   }
 
   public static class CustomMerger implements HoodieRecordMerger {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index e20104858b6..c2dc9091f45 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -21,14 +21,14 @@ package org.apache.hudi.common.table.read
 
 import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
 import org.apache.hudi.common.engine.HoodieReaderContext
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, 
HoodieRecordMerger, OverwriteWithLatestAvroPayload, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.storage.StorageConfiguration
-import org.apache.hudi.{HoodieSparkRecordMerger, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
-
+import org.apache.hudi.{HoodieSparkRecordMerger, 
OverwriteWithLatestSparkMerger, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.common.config.RecordMergeMode
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, 
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
@@ -38,7 +38,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 import java.util
-
 import scala.collection.JavaConverters._
 
 /**
@@ -48,6 +47,8 @@ import scala.collection.JavaConverters._
 class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[InternalRow] with SparkAdapterSupport {
   var spark: SparkSession = _
 
+  var customPayloadName: String = classOf[CustomPayloadForTesting].getName
+
   @BeforeEach
   def setup() {
     val sparkConf = new SparkConf
@@ -121,4 +122,12 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
   override def getComparableUTF8String(value: String): Comparable[_] = {
     UTF8String.fromString(value)
   }
+
+  override def getRecordPayloadForMergeMode(mergeMode: RecordMergeMode): 
String = {
+    mergeMode match {
+      case RecordMergeMode.EVENT_TIME_ORDERING => 
classOf[DefaultHoodieRecordPayload].getName
+      case RecordMergeMode.OVERWRITE_WITH_LATEST => 
classOf[OverwriteWithLatestAvroPayload].getName
+      case RecordMergeMode.CUSTOM => customPayloadName
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 9778301f2f6..1581eb873b8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -344,9 +344,7 @@ public class StreamSync implements Serializable, Closeable {
             .setConf(HadoopFSUtils.getStorageConfWithCopy(conf))
             .setBasePath(cfg.targetBasePath)
             .setPayloadClassName(cfg.payloadClassName)
-            .setRecordMergerStrategy(
-                
props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
-                    HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
+            .setRecordMergerStrategy(null)
             .build();
         switch (meta.getTableType()) {
           case COPY_ON_WRITE:
@@ -440,7 +438,7 @@ public class StreamSync implements Serializable, Closeable {
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
         .setConf(HadoopFSUtils.getStorageConfWithCopy(conf))
         .setBasePath(cfg.targetBasePath)
-        
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
 HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
+        .setRecordMergerStrategy(null)
         
.setTimeGeneratorConfig(HoodieTimeGeneratorConfig.newBuilder().fromProperties(props).withPath(cfg.targetBasePath).build())
         .build();
     String instantTime = metaClient.createNewInstantTime();


Reply via email to