This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch upstream/release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ec486855e5cd9ccc42c9863d5b8b15a612f0a51e
Author: wulei <[email protected]>
AuthorDate: Tue Jul 5 02:09:12 2022 +0800

    [HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific  
HoodieRecord (#5627)
    
    Co-authored-by: wangzixuan.wzxuan <[email protected]>
---
 .../apache/hudi/config/HoodieCompactionConfig.java |  12 +
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 +
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   5 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   4 +
 .../hudi/table/action/commit/BaseWriteHelper.java  |   7 +-
 .../table/action/commit/HoodieWriteHelper.java     |  11 +-
 .../hudi/testutils/HoodieWriteableTestTable.java   |  13 +-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |   2 -
 .../hudi/table/action/commit/FlinkWriteHelper.java |   6 +-
 .../hudi/table/action/commit/JavaWriteHelper.java  |   7 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |   1 -
 .../spark/sql/HoodieCatalystExpressionUtils.scala  |  21 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   7 +-
 .../io/storage/row/TestHoodieRowCreateHandle.java  |   1 -
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  20 +-
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  17 +-
 .../apache/hudi/common/model/HoodieAvroRecord.java |  49 +---
 .../hudi/common/model/HoodieAvroRecordMerge.java   |  57 +++++
 .../org/apache/hudi/common/model/HoodieMerge.java  |  38 +++
 .../org/apache/hudi/common/model/HoodieRecord.java |  36 +--
 .../hudi/common/table/HoodieTableConfig.java       |  15 ++
 .../hudi/common/table/HoodieTableMetaClient.java   |  10 +
 .../hudi/common/table/TableSchemaResolver.java     |   1 -
 .../table/log/AbstractHoodieLogRecordReader.java   |   7 +
 .../table/log/HoodieMergedLogRecordScanner.java    |   7 +-
 .../table/log/block/HoodieParquetDataBlock.java    |   2 +-
 .../apache/hudi/common/util/HoodieRecordUtils.java |  68 +++++
 .../apache/hudi/common/util/ReflectionUtils.java   |  13 -
 .../apache/hudi/common/util/SpillableMapUtils.java |   4 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  41 +--
 .../hudi/common/util/HoodieRecordUtilsTest.java    |  47 ++++
 .../apache/hudi/configuration/FlinkOptions.java    |   8 +
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  15 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   6 +
 .../org/apache/hudi/table/HoodieTableSource.java   |   3 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |  16 +-
 .../table/format/mor/MergeOnReadTableState.java    |   9 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   2 +
 .../apache/hudi/source/TestStreamReadOperator.java |   3 +-
 .../test/java/org/apache/hudi/utils/TestData.java  |   4 +-
 .../BulkInsertDataInternalWriterHelper.java        |   2 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   6 +
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  10 +-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |  21 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +
 .../spark/sql/hudi/HoodieInternalRowUtils.scala    | 282 +++++++++++++++++++++
 .../apache/spark/sql/hudi/HoodieSparkRecord.java   | 190 ++++++++++++++
 .../spark/sql/hudi/HoodieSparkRecordMerge.java     |  48 ++++
 .../org/apache/spark/sql/hudi/SparkHelpers.scala   |   2 +-
 .../apache/hudi/TestHoodieInternalRowUtils.scala   | 114 +++++++++
 .../hudi/TestStructTypeSchemaEvolutionUtils.scala  | 222 ++++++++++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +
 .../deltastreamer/HoodieDeltaStreamer.java         |   5 +
 53 files changed, 1325 insertions(+), 187 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index d1d0e67261..0b7311b701 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -113,6 +114,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + "compaction during each compaction run. By default. Hudi picks the 
log file "
           + "with most accumulated unmerged data");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+      .key("hoodie.compaction.merge.class")
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDocumentation("Merge class provide stateless component interface 
for merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE 
= ConfigProperty
       .key("hoodie.compaction.lazy.block.read")
       .defaultValue("true")
@@ -346,6 +353,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withMergeClass(String mergeClass) {
+      compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass);
+      return this;
+    }
+
     public Builder withTargetIOPerCompactionInMB(long 
targetIOPerCompactionInMB) {
       compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, 
String.valueOf(targetIOPerCompactionInMB));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2cb9e8ce82..8d903cffac 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FileSystemRetryConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -125,6 +126,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+      .key("hoodie.datasource.write.merge.class")
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDocumentation("Merge class provide stateless component interface 
for merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = 
ConfigProperty
       .key("hoodie.datasource.write.keygenerator.class")
       .noDefaultValue()
@@ -1321,6 +1328,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
   }
 
+  public String getMergeClass() {
+    return getString(HoodieCompactionConfig.MERGE_CLASS_NAME);
+  }
+
   public int getTargetPartitionsPerDayBasedCompaction() {
     return 
getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 3b6bda7877..fff4aa6d05 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -338,10 +338,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
       // writing the first record. So make a copy of the record to be merged
       HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
       try {
-        Option<HoodieRecord> combinedRecord =
-            hoodieRecord.combineAndGetUpdateValue(oldRecord,
-                schema,
-                props);
+        Option<HoodieRecord> combinedRecord = 
merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props);
 
         if (combinedRecord.isPresent() && 
combinedRecord.get().shouldIgnore(schema, props)) {
           // If it is an IGNORE_RECORD, just copy the old record, and do not 
update the new record.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 09f8831f8b..9ee6e0884d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -27,7 +27,9 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -59,6 +61,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
    */
   protected final Schema tableSchema;
   protected final Schema tableSchemaWithMetaFields;
+  protected final HoodieMerge merge;
 
   /**
    * The write schema. In most case the write schema is the same to the
@@ -103,6 +106,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
     schemaOnReadEnabled = 
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+    this.merge = HoodieRecordUtils.loadMerge(config.getMergeClass());
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index d8682152fa..1efe3d9641 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -19,7 +19,9 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
@@ -81,9 +83,10 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
    */
   public I deduplicateRecords(
       I records, HoodieTable<T, I, K, O> table, int parallelism) {
-    return deduplicateRecords(records, table.getIndex(), parallelism);
+    HoodieMerge merge = 
HoodieRecordUtils.loadMerge(table.getConfig().getMergeClass());
+    return deduplicateRecords(records, table.getIndex(), parallelism, merge);
   }
 
   public abstract I deduplicateRecords(
-      I records, HoodieIndex<?, ?> index, int parallelism);
+      I records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge);
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index e24cd71ab6..57bb511c63 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -23,13 +23,13 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
 public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, 
HoodieData<HoodieRecord<T>>,
     HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
-
   private HoodieWriteHelper() {
   }
 
@@ -49,7 +49,7 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
 
   @Override
   public HoodieData<HoodieRecord<T>> deduplicateRecords(
-      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism) {
+      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism, HoodieMerge merge) {
     boolean isIndexingGlobal = index.isGlobal();
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
@@ -58,10 +58,9 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
       return Pair.of(key, record);
     }).reduceByKey((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
-      HoodieRecord reducedRec = rec2.preCombine(rec1);
-      HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() 
: rec2.getKey();
-
-      return (HoodieRecord<T>) reducedRec.newInstance(reducedKey);
+      HoodieRecord<T> reducedRecord =  merge.preCombine(rec1, rec2);
+      HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? 
rec1.getKey() : rec2.getKey();
+      return reducedRecord.newInstance(reducedKey);
     }, parallelism).map(Pair::getRight);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index d2ecd09a23..a952576026 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -19,6 +19,12 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -44,13 +50,6 @@ import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
 import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.orc.CompressionKind;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 1ae12780f0..e3e25fa9a9 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -39,8 +39,6 @@ import org.apache.avro.specific.SpecificRecordBase;
 
 import java.util.List;
 
-import static org.apache.hudi.common.data.HoodieList.getList;
-
 public abstract class HoodieFlinkTable<T>
     extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>>
     implements ExplicitWriteHandleTable<T> {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 20b8b9fcf3..ce6107714d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
@@ -86,13 +87,14 @@ public class FlinkWriteHelper<T, R> extends 
BaseWriteHelper<T, List<HoodieRecord
 
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) 
{
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, 
HoodieMerge merge) {
     // If index used is global, then records are expected to differ in their 
partitionPath
     Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
         .collect(Collectors.groupingBy(record -> 
record.getKey().getRecordKey()));
 
     return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, 
rec2) -> {
-      @SuppressWarnings("unchecked") final HoodieRecord reducedRec = 
rec2.preCombine(rec1);
+      @SuppressWarnings("unchecked")
+      final HoodieRecord reducedRec =  merge.preCombine(rec1, rec2);
       // we cannot allow the user to change the key or partitionPath, since 
that will affect
       // everything
       // so pick it from one of the records.
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index f002c208d1..39c60447c5 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
@@ -53,7 +54,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, 
List<HoodieRecord<T
 
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) 
{
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, 
HoodieMerge merge) {
     boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = 
records.stream().map(record -> {
       HoodieKey hoodieKey = record.getKey();
@@ -64,11 +65,11 @@ public class JavaWriteHelper<T,R> extends 
BaseWriteHelper<T, List<HoodieRecord<T
 
     return keyedRecords.values().stream().map(x -> 
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
-      HoodieRecord reducedRec = rec2.preCombine(rec1);
+      HoodieRecord<T> reducedRecord =  merge.preCombine(rec1,rec2);
       // we cannot allow the user to change the key or partitionPath, since 
that will affect
       // everything
       // so pick it from one of the records.
-      return (HoodieRecord<T>) reducedRec.newInstance(rec1.getKey());
+      return reducedRecord.newInstance(rec1.getKey());
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 2b581004dc..e7c6ccd6fa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.model.HoodieInternalRow;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.HoodieTimer;
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index a3b9c210b9..636dd299fe 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedFunction}
-import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, SubqueryExpression, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, 
GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, MutableProjection, SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, 
LogicalPlan}
 import org.apache.spark.sql.types.StructType
 
@@ -42,6 +42,23 @@ trait HoodieCatalystExpressionUtils {
     GenerateUnsafeProjection.generate(targetExprs, attrs)
   }
 
+  /**
+   * Generates instance of [[MutableProjection]] projecting row of one 
[[StructType]] into another [[StructType]]
+   *
+   * NOTE: No safety checks are executed to validate that this projection is 
actually feasible,
+   *       it's up to the caller to make sure that such projection is possible.
+   *
+   * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is 
only possible, if
+   *       B is a subset of A
+   */
+  def generateMutableProjection(from: StructType, to: StructType): 
MutableProjection = {
+    val attrs = from.toAttributes
+    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
+    val targetExprs = to.fields.map(f => attrsMap(f.name))
+
+    GenerateMutableProjection.generate(targetExprs, attrs)
+  }
+
   /**
    * Parses and resolves expression against the attributes of the given table 
schema.
    *
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 8ba459b772..05588a80ba 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -38,12 +38,14 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
@@ -462,7 +464,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
-    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = 
HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 
1).collectAsList();
+    HoodieMerge merge = new HoodieAvroRecordMerge();
+    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = 
HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, 
merge).collectAsList();
     assertEquals(1, dedupedRecs.size());
     assertEquals(dedupedRecs.get(0).getPartitionPath(), 
recordThree.getPartitionPath());
     assertNodupesWithinPartition(dedupedRecs);
@@ -470,7 +473,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     // non-Global dedup should be done based on both recordKey and 
partitionPath
     index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(false);
-    dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, 
index, 1).collectAsList();
+    dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, 
index, 1, merge).collectAsList();
     assertEquals(2, dedupedRecs.size());
     assertNodupesWithinPartition(dedupedRecs);
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index b4e75a02b2..ad73a256a6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -21,7 +21,6 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.StringUtils;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 463ea82f96..81e0032157 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -22,9 +22,9 @@ import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -768,9 +768,7 @@ public class HoodieAvroUtils {
     Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
     switch (newSchema.getType()) {
       case RECORD:
-        if (!(oldRecord instanceof IndexedRecord)) {
-          throw new IllegalArgumentException("cannot rewrite record with 
different type");
-        }
+        ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, 
"cannot rewrite record with different type");
         IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
         List<Schema.Field> fields = newSchema.getFields();
         GenericData.Record newRecord = new GenericData.Record(newSchema);
@@ -802,9 +800,7 @@ public class HoodieAvroUtils {
         }
         return newRecord;
       case ARRAY:
-        if (!(oldRecord instanceof Collection)) {
-          throw new IllegalArgumentException("cannot rewrite record with 
different type");
-        }
+        ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot 
rewrite record with different type");
         Collection array = (Collection)oldRecord;
         List<Object> newArray = new ArrayList();
         fieldNames.push("element");
@@ -814,9 +810,7 @@ public class HoodieAvroUtils {
         fieldNames.pop();
         return newArray;
       case MAP:
-        if (!(oldRecord instanceof Map)) {
-          throw new IllegalArgumentException("cannot rewrite record with 
different type");
-        }
+        ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot 
rewrite record with different type");
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>();
         fieldNames.push("value");
@@ -832,7 +826,7 @@ public class HoodieAvroUtils {
     }
   }
 
-  private static String createFullName(Deque<String> fieldNames) {
+  public static String createFullName(Deque<String> fieldNames) {
     String result = "";
     if (!fieldNames.isEmpty()) {
       List<String> parentNames = new ArrayList<>();
@@ -967,7 +961,7 @@ public class HoodieAvroUtils {
   }
 
   // convert days to Date
-  private static java.sql.Date toJavaDate(int days) {
+  public static java.sql.Date toJavaDate(int days) {
     long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
     int timeZoneOffset;
     TimeZone defaultTimeZone = TimeZone.getDefault();
@@ -980,7 +974,7 @@ public class HoodieAvroUtils {
   }
 
   // convert Date to days
-  private static int fromJavaDate(Date date) {
+  public static int fromJavaDate(Date date) {
     long millisUtc = date.getTime();
     long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
     int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, 
MILLIS_PER_DAY));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index ac2df00151..daec2fee03 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -48,7 +48,7 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
   }
 
   public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, 
HoodieOperation operation) {
-    super(key, data, operation);
+    super(key, data, operation, null);
   }
 
   public HoodieAvroIndexedRecord(HoodieRecord<IndexedRecord> record) {
@@ -67,11 +67,6 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
     return Option.of(data);
   }
 
-  @Override
-  public Comparable<?> getOrderingValue() {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public HoodieRecord newInstance() {
     throw new UnsupportedOperationException();
@@ -99,16 +94,6 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
         .map(Object::toString).orElse(null);
   }
 
-  @Override
-  public HoodieRecord preCombine(HoodieRecord<IndexedRecord> previousRecord) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord 
previousRecord, Schema schema, Properties props) throws IOException {
-    return Option.empty();
-  }
-
   @Override
   public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, 
Schema writerSchema) throws IOException {
     ValidationUtils.checkState(other instanceof HoodieAvroIndexedRecord);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 9a9011da37..7bc6af89f8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -21,11 +21,10 @@ package org.apache.hudi.common.model;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.metadata.HoodieMetadataPayload;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -39,7 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 public class HoodieAvroRecord<T extends HoodieRecordPayload> extends 
HoodieRecord<T> {
   public HoodieAvroRecord(HoodieKey key, T data) {
@@ -47,7 +46,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> 
extends HoodieRecor
   }
 
   public HoodieAvroRecord(HoodieKey key, T data, HoodieOperation operation) {
-    super(key, data, operation);
+    super(key, data, operation, null);
   }
 
   public HoodieAvroRecord(HoodieRecord<T> record) {
@@ -106,34 +105,6 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
   // NOTE: This method duplicates those ones of the HoodieRecordPayload and 
are placed here
   //       for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
   //       is complete
-  //
-  // TODO cleanup
-
-  // NOTE: This method is assuming semantic that `preCombine` operation is 
bound to pick one or the other
-  //       object, and may not create a new one
-  @Override
-  public HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord) {
-    T picked = unsafeCast(getData().preCombine(previousRecord.getData()));
-    if (picked instanceof HoodieMetadataPayload) {
-      // NOTE: HoodieMetadataPayload return a new payload
-      return new HoodieAvroRecord<>(getKey(), picked, getOperation());
-    }
-    return picked.equals(getData()) ? this : previousRecord;
-  }
-
-  // NOTE: This method is assuming semantic that only records bearing the same 
(partition, key) could
-  //       be combined
-  @Override
-  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord 
previousRecord, Schema schema, Properties props) throws IOException {
-    Option<IndexedRecord> previousRecordAvroPayload = 
previousRecord.toIndexedRecord(schema, props);
-    if (!previousRecordAvroPayload.isPresent()) {
-      return Option.empty();
-    }
-
-    return getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), 
schema, props)
-        .map(combinedAvroPayload -> new 
HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
-  }
-
   @Override
   public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, 
Schema writerSchema) throws IOException {
     ValidationUtils.checkState(other instanceof HoodieAvroRecord);
@@ -141,7 +112,7 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
         (GenericRecord) toIndexedRecord(readerSchema, new Properties()).get(),
         (GenericRecord) other.toIndexedRecord(readerSchema, new 
Properties()).get(),
         writerSchema);
-    return new HoodieAvroRecord(getKey(), 
instantiateRecordPayloadWrapper(mergedPayload, getPrecombineValue(getData())), 
getOperation());
+    return new HoodieAvroRecord(getKey(), 
instantiateRecordPayloadWrapper(mergedPayload, getOrderingValue()), 
getOperation());
   }
 
   @Override
@@ -234,20 +205,10 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
   @Nonnull
   private T instantiateRecordPayloadWrapper(Object combinedAvroPayload, 
Comparable newPreCombineVal) {
     return unsafeCast(
-        ReflectionUtils.loadPayload(
+        HoodieRecordUtils.loadPayload(
             getData().getClass().getCanonicalName(),
             new Object[]{combinedAvroPayload, newPreCombineVal},
             GenericRecord.class,
             Comparable.class));
   }
-
-  private static <T extends HoodieRecordPayload> Comparable 
getPrecombineValue(T data) {
-    if (data instanceof BaseAvroPayload) {
-      return ((BaseAvroPayload) data).orderingVal;
-    }
-
-    return -1;
-  }
-
-  
//////////////////////////////////////////////////////////////////////////////
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java
new file mode 100644
index 0000000000..bea89efdc1
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+public class HoodieAvroRecordMerge implements HoodieMerge {
+  @Override
+  public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
+    HoodieRecordPayload picked = unsafeCast(((HoodieAvroRecord) 
newer).getData().preCombine(((HoodieAvroRecord) older).getData()));
+    if (picked instanceof HoodieMetadataPayload) {
+      // NOTE: HoodieMetadataPayload return a new payload
+      return new HoodieAvroRecord(newer.getKey(), picked, 
newer.getOperation());
+    }
+    return picked.equals(((HoodieAvroRecord) newer).getData()) ? newer : older;
+  }
+
+  @Override
+  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException {
+    Option<IndexedRecord> previousRecordAvroPayload;
+    if (older instanceof HoodieAvroIndexedRecord) {
+      previousRecordAvroPayload = Option.ofNullable(((HoodieAvroIndexedRecord) 
older).getData());
+    } else {
+      previousRecordAvroPayload = 
((HoodieRecordPayload)older.getData()).getInsertValue(schema, props);
+    }
+    if (!previousRecordAvroPayload.isPresent()) {
+      return Option.empty();
+    }
+
+    return ((HoodieAvroRecord) 
newer).getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), 
schema, props)
+        .map(combinedAvroPayload -> new 
HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java
new file mode 100644
index 0000000000..6becf35591
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * HoodieMerge defines how to merge two records. It is a stateless component.
+ * It can implement the merging logic of HoodieRecord of different engines
+ * and avoid the performance consumption caused by the 
serialization/deserialization of Avro payload.
+ */
+public interface HoodieMerge extends Serializable {
+  
+  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+
+  Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException;
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 6b54933d6a..b26f1d296e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -135,25 +135,35 @@ public abstract class HoodieRecord<T> implements 
Serializable {
    */
   private HoodieOperation operation;
 
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable<?> orderingVal;
+
   public HoodieRecord(HoodieKey key, T data) {
-    this(key, data, null);
+    this(key, data, null, null);
   }
 
-  public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) {
+  public HoodieRecord(HoodieKey key, T data, Comparable<?> orderingVal) {
+    this(key, data, null, orderingVal);
+  }
+
+  public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, 
Comparable<?> orderingVal) {
     this.key = key;
     this.data = data;
     this.currentLocation = null;
     this.newLocation = null;
     this.sealed = false;
     this.operation = operation;
+    // default natural order is 0
+    this.orderingVal = orderingVal == null ? 0 : orderingVal;
   }
 
   public HoodieRecord(HoodieRecord<T> record) {
-    this(record.key, record.data);
+    this(record.key, record.data, record.operation, record.orderingVal);
     this.currentLocation = record.currentLocation;
     this.newLocation = record.newLocation;
     this.sealed = record.sealed;
-    this.operation = record.operation;
   }
 
   public HoodieRecord() {
@@ -173,15 +183,17 @@ public abstract class HoodieRecord<T> implements 
Serializable {
     return operation;
   }
 
+  public Comparable<?> getOrderingValue() {
+    return orderingVal;
+  }
+
   public T getData() {
     if (data == null) {
-      throw new IllegalStateException("Payload already deflated for record.");
+      throw new IllegalStateException("HoodieRecord already deflated for 
record.");
     }
     return data;
   }
 
-  public abstract Comparable<?> getOrderingValue();
-
   /**
    * Release the actual payload, to ease memory pressure. To be called after 
the record has been written to storage.
    * Once deflated, cannot be inflated.
@@ -285,16 +297,6 @@ public abstract class HoodieRecord<T> implements 
Serializable {
   //       for the duration of RFC-46 implementation, until migration off 
`HoodieRecordPayload`
   //       is complete
   //
-  // TODO cleanup
-
-  // NOTE: This method is assuming semantic that `preCombine` operation is 
bound to pick one or the other
-  //       object, and may not create a new one
-  public abstract HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord);
-
-  // NOTE: This method is assuming semantic that only records bearing the same 
(partition, key) could
-  //       be combined
-  public abstract Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord 
previousRecord, Schema schema, Properties props) throws IOException;
-
   public abstract HoodieRecord mergeWith(HoodieRecord other, Schema 
readerSchema, Schema writerSchema) throws IOException;
 
   public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema 
targetSchema, TypedProperties props) throws IOException;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 6b64ec4897..aae29a21bb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.OrderedProperties;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -155,6 +156,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Payload class to use for performing compactions, i.e 
merge delta logs with current base file and then "
           + " produce a new base file.");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+      .key("hoodie.compaction.merge.class")
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDocumentation("Merge class provide stateless component interface 
for merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
       .key("hoodie.archivelog.folder")
       .defaultValue("archived")
@@ -481,6 +488,14 @@ public class HoodieTableConfig extends HoodieConfig {
         "org.apache.hudi");
   }
 
+  /**
+   * Read the hoodie merge class for HoodieRecords from the table properties.
+   */
+  public String getMergeClass() {
+    return getStringOrDefault(MERGE_CLASS_NAME).replace("com.uber.hoodie",
+        "org.apache.hudi");
+  }
+
   public String getPreCombineField() {
     return getString(PRECOMBINE_FIELD);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 529b0e8c99..b0da7a9203 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -725,6 +725,7 @@ public class HoodieTableMetaClient implements Serializable {
     private String recordKeyFields;
     private String archiveLogFolder;
     private String payloadClassName;
+    private String mergeClassName;
     private Integer timelineLayoutVersion;
     private String baseFileFormat;
     private String preCombineField;
@@ -791,6 +792,11 @@ public class HoodieTableMetaClient implements Serializable 
{
       return this;
     }
 
+    public PropertyBuilder setMergeClassName(String mergeClassName) {
+      this.mergeClassName = mergeClassName;
+      return this;
+    }
+
     public PropertyBuilder setPayloadClass(Class<? extends 
HoodieRecordPayload> payloadClass) {
       return setPayloadClassName(payloadClass.getName());
     }
@@ -1004,6 +1010,10 @@ public class HoodieTableMetaClient implements 
Serializable {
         tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, 
payloadClassName);
       }
 
+      if (mergeClassName != null) {
+        tableConfig.setValue(HoodieTableConfig.MERGE_CLASS_NAME, 
mergeClassName);
+      }
+
       if (null != tableCreateSchema) {
         tableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA, 
tableCreateSchema);
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index f7373fde3b..1c31e0120c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -22,7 +22,6 @@ import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 1f27028e26..afad95c4aa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -96,6 +96,8 @@ public abstract class AbstractHoodieLogRecordReader {
   private final String payloadClassFQN;
   // preCombine field
   private final String preCombineField;
+  // Stateless component for merging records
+  private final String mergeClassFQN;
   // simple key gen fields
   private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
   // Log File Paths
@@ -160,6 +162,7 @@ public abstract class AbstractHoodieLogRecordReader {
     HoodieTableConfig tableConfig = 
this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
     this.preCombineField = tableConfig.getPreCombineField();
+    this.mergeClassFQN = tableConfig.getMergeClass();
     this.totalLogFiles.addAndGet(logFilePaths.size());
     this.logFilePaths = logFilePaths;
     this.reverseReader = reverseReader;
@@ -525,6 +528,10 @@ public abstract class AbstractHoodieLogRecordReader {
     return payloadClassFQN;
   }
 
+  protected String getMergeClassFQN() {
+    return mergeClassFQN;
+  }
+
   public Option<String> getPartitionName() {
     return partitionName;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 6d313b64f9..dfc3c14b5b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -24,10 +24,12 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -78,6 +80,8 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
   // Stores the total time taken to perform reading and merging of log blocks
   private long totalTimeTakenToReadAndMergeBlocks;
 
+  private final HoodieMerge merge;
+
   @SuppressWarnings("unchecked")
   protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, Long 
maxMemorySizeInBytes, boolean readBlocksLazily,
@@ -95,6 +99,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
           new HoodieRecordSizeEstimator(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
       this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+      this.merge = HoodieRecordUtils.loadMerge(getMergeClassFQN());
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
     }
@@ -150,7 +155,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
 
       HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
       HoodieRecordPayload oldValue = oldRecord.getData();
-      HoodieRecordPayload combinedValue = 
(HoodieRecordPayload)hoodieRecord.preCombine(oldRecord).getData();
+      HoodieRecordPayload combinedValue = (HoodieRecordPayload) 
merge.preCombine(oldRecord, hoodieRecord).getData();
       // If combinedValue is oldValue, no need rePut oldRecord
       if (combinedValue != oldValue) {
         HoodieOperation operation = hoodieRecord.getOperation();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index ebe53fe471..98f5dcf3a0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -31,8 +31,8 @@ import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetReaderIterator;
-import org.apache.hudi.io.storage.HoodieParquetConfig;
 import 
org.apache.hudi.io.storage.HoodieAvroFileReader.HoodieRecordTransformIterator;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
new file mode 100644
index 0000000000..075d117fe2
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for HoodieRecord.
+ */
+public class HoodieRecordUtils {
+
+  private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();
+
+  /**
+   * Instantiate a given class with a record merge.
+   */
+  public static HoodieMerge loadMerge(String mergeClass) {
+    try {
+      HoodieMerge merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
+      if (null == merge) {
+        synchronized (HoodieMerge.class) {
+          merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
+          if (null == merge) {
+            merge = (HoodieMerge)ReflectionUtils.loadClass(mergeClass, new 
Object[]{});
+            INSTANCE_CACHE.put(mergeClass, merge);
+          }
+        }
+      }
+      return merge;
+    } catch (HoodieException e) {
+      throw new HoodieException("Unable to instantiate hoodie merge class ", 
e);
+    }
+  }
+
+  /**
+   * Instantiate a given class with an avro record payload.
+   */
+  public static <T extends HoodieRecordPayload> T loadPayload(String 
recordPayloadClass, Object[] payloadArgs,
+                                                              Class<?>... 
constructorArgTypes) {
+    try {
+      return (T) 
ReflectionUtils.getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs);
+    } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException | NoSuchMethodException e) {
+      throw new HoodieException("Unable to instantiate payload class ", e);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index 6ee7928c75..b3e178320b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.util;
 
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.log4j.LogManager;
@@ -69,18 +68,6 @@ public class ReflectionUtils {
     }
   }
 
-  /**
-   * Instantiate a given class with a generic record payload.
-   */
-  public static <T extends HoodieRecordPayload> T loadPayload(String 
recordPayloadClass, Object[] payloadArgs,
-      Class<?>... constructorArgTypes) {
-    try {
-      return (T) 
getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs);
-    } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException | NoSuchMethodException e) {
-      throw new HoodieException("Unable to instantiate payload class ", e);
-    }
-  }
-
   /**
    * Creates an instance of the given class. Use this version when dealing 
with interface types as constructor args.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
index d4bafd9c9f..d2d91bbfb6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
@@ -137,7 +137,7 @@ public class SpillableMapUtils {
     HoodieOperation operation = withOperationField
         ? HoodieOperation.fromName(getNullableValAsString(record, 
HoodieRecord.OPERATION_METADATA_FIELD)) : null;
     HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new 
HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
-        ReflectionUtils.loadPayload(payloadClazz, new Object[]{record, 
preCombineVal}, GenericRecord.class,
+        HoodieRecordUtils.loadPayload(payloadClazz, new Object[]{record, 
preCombineVal}, GenericRecord.class,
             Comparable.class), operation);
 
     return (R) hoodieRecord;
@@ -163,7 +163,7 @@ public class SpillableMapUtils {
    */
   public static <R> R generateEmptyPayload(String recKey, String 
partitionPath, Comparable orderingVal, String payloadClazz) {
     HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new 
HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
-        ReflectionUtils.loadPayload(payloadClazz, new Object[] {null, 
orderingVal}, GenericRecord.class, Comparable.class));
+        HoodieRecordUtils.loadPayload(payloadClazz, new Object[] {null, 
orderingVal}, GenericRecord.class, Comparable.class));
     return (R) hoodieRecord;
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index c67f0e1b98..360af469c2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -157,9 +157,10 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     List<FileSlice> partitionFileSlices =
         
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
 
-    return engineContext.parallelize(partitionFileSlices)
+    return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
+        engineContext.parallelize(partitionFileSlices))
         .flatMap(
-            (SerializableFunction<FileSlice, Iterator<Pair<String, 
Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
+            (SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
               // NOTE: Since this will be executed by executors, we can't 
access previously cached
               //       readers, and therefore have to always open new ones
               Pair<HoodieAvroFileReader, HoodieMetadataMergedLogRecordReader> 
readers =
@@ -170,31 +171,31 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
                 HoodieAvroFileReader baseFileReader = readers.getKey();
                 HoodieMetadataMergedLogRecordReader logRecordScanner = 
readers.getRight();
 
-            if (baseFileReader == null && logRecordScanner == null) {
-              // TODO: what do we do if both does not exist? should we throw 
an exception and let caller do the fallback ?
-              return Collections.emptyIterator();
-            }
+                if (baseFileReader == null && logRecordScanner == null) {
+                  // TODO: what do we do if both does not exist? should we 
throw an exception and let caller do the fallback ?
+                  return Collections.emptyIterator();
+                }
 
-            boolean fullKeys = false;
+              boolean fullKeys = false;
 
-            Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
logRecords =
-                readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, 
timings);
+              Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
logRecords =
+                  readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
 
-            List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
mergedRecords =
-                readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+              List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
mergedRecords =
+                  readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
 
-            LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
-                sortedKeyPrefixes.size(), timings));
+              LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
+                  sortedKeyPrefixes.size(), timings));
 
-            return mergedRecords.stream()
+              return mergedRecords.stream()
                 .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
                 .iterator();
-          } catch (IOException ioe) {
-            throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-          } finally {
-            closeReader(readers);
-          }
-        })
+            } catch (IOException ioe) {
+              throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+            } finally {
+              closeReader(readers);
+            }
+          })
         .filter(Objects::nonNull);
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
new file mode 100644
index 0000000000..0c51571c9e
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class HoodieRecordUtilsTest {
+
+  @Test
+  void loadHoodieMerge() {
+    String mergeClassName = HoodieAvroRecordMerge.class.getName();
+    HoodieMerge merge1 = HoodieRecordUtils.loadMerge(mergeClassName);
+    HoodieMerge merge2 = HoodieRecordUtils.loadMerge(mergeClassName);
+    assertEquals(merge1.getClass().getName(), mergeClassName);
+    assertEquals(merge1, merge2);
+  }
+
+  @Test
+  void loadPayload() {
+    String payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    HoodieRecordPayload payload = 
HoodieRecordUtils.loadPayload(payloadClassName, new Object[]{null, 0}, 
GenericRecord.class, Comparable.class);
+    assertEquals(payload.getClass().getName(), payloadClassName);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 933c112312..118ff5ccc7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringP
 import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -291,6 +292,13 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Payload class used. Override this, if you like to roll 
your own merge logic, when upserting/inserting.\n"
           + "This will render any value set for the option in-effective");
 
+  public static final ConfigOption<String> MERGE_CLASS_NAME = ConfigOptions
+      .key("write.merge.class")
+      .stringType()
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDescription("Merge class provide stateless component interface for 
merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   /**
    * Flag to indicate whether to drop duplicates before insert/upsert.
    * By default false to gain extra performance.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 2748af5290..b6b65bb8f1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -23,9 +23,11 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -102,6 +104,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
   private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> 
writeFunction;
 
+  private transient HoodieMerge merge;
+
   /**
    * Total size tracer.
    */
@@ -121,6 +125,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     this.tracer = new TotalSizeTracer(this.config);
     initBuffer();
     initWriteFunction();
+    initMergeClass();
   }
 
   @Override
@@ -195,6 +200,12 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     }
   }
 
+  private void initMergeClass() {
+    String mergeClassName = metaClient.getTableConfig().getMergeClass();
+    LOG.info("init hoodie merge with class [{}]", mergeClassName);
+    merge = HoodieRecordUtils.loadMerge(mergeClassName);
+  }
+
   /**
    * Represents a data item in the buffer, this is needed to reduce the
    * memory footprint.
@@ -421,7 +432,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     List<HoodieRecord> records = bucket.writeBuffer();
     ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has 
no buffering records");
     if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
-      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, 
(HoodieIndex) null, -1);
+      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, 
(HoodieIndex) null, -1, merge);
     }
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, instant));
@@ -456,7 +467,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
             List<HoodieRecord> records = bucket.writeBuffer();
             if (records.size() > 0) {
               if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
-                records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, 
-1);
+                records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, 
-1, merge);
               }
               bucket.preWrite(records);
               writeStatus.addAll(writeFunction.apply(records, currentInstant));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 3a69e5d080..712bd9b782 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.streamer;
 
 import org.apache.hudi.client.utils.OperationConverter;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -118,6 +119,10 @@ public class FlinkStreamerConfig extends Configuration {
       + "a GenericRecord. Implement your own, if you want to do something 
other than overwriting existing value.")
   public String payloadClassName = 
OverwriteWithLatestAvroPayload.class.getName();
 
+  @Parameter(names = {"--merge-class"}, description = "Implements of 
HoodieMerge, that defines how to merge two records."
+      + "Implement your own, if you want to implement specific record merge 
logic.")
+  public String mergeClassName = HoodieAvroRecordMerge.class.getName();
+
   @Parameter(names = {"--op"}, description = "Takes one of these values : 
UPSERT (default), INSERT (use when input "
       + "is purely new data/inserts to gain speed).", converter = 
OperationConverter.class)
   public WriteOperationType operation = WriteOperationType.UPSERT;
@@ -357,6 +362,7 @@ public class FlinkStreamerConfig extends Configuration {
     conf.setString(FlinkOptions.OPERATION, config.operation.value());
     conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
     conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);
+    conf.setString(FlinkOptions.MERGE_CLASS_NAME, config.mergeClassName);
     conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine);
     conf.setInteger(FlinkOptions.RETRY_TIMES, 
Integer.parseInt(config.instantRetryTimes));
     conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, 
Long.parseLong(config.instantRetryInterval));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 2034cb322e..c63b98b553 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -421,7 +421,8 @@ public class HoodieTableSource implements
         tableAvroSchema.toString(),
         AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
         inputSplits,
-        conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+        conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","),
+        conf.getString(FlinkOptions.MERGE_CLASS_NAME));
     return MergeOnReadInputFormat.builder()
         .config(this.conf)
         .tableState(hoodieTableState)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 76e9e60ee0..6dc6c76082 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,12 +18,15 @@
 
 package org.apache.hudi.table.format.mor;
 
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -63,6 +66,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.IntStream;
 
@@ -200,7 +204,8 @@ public class MergeOnReadInputFormat
           this.requiredPos,
           this.emitDelete,
           this.tableState.getOperationPos(),
-          getFullSchemaReader(split.getBasePath().get()));
+          getFullSchemaReader(split.getBasePath().get()),
+          tableState.getMergeClass());
     } else {
       throw new HoodieException("Unable to select an Iterator to read the 
Hoodie MOR File Split for "
           + "file path: " + split.getBasePath()
@@ -627,6 +632,8 @@ public class MergeOnReadInputFormat
 
     private final InstantRange instantRange;
 
+    private final HoodieMerge merge;
+
     // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
     // method #reachedEnd() returns false after it returns true.
     // refactor it out once FLINK-22370 is resolved.
@@ -647,7 +654,8 @@ public class MergeOnReadInputFormat
         int[] requiredPos,
         boolean emitDelete,
         int operationPos,
-        ParquetColumnarRowSplitReader reader) { // the reader should be with 
full schema
+        ParquetColumnarRowSplitReader reader, // the reader should be with 
full schema
+        String mergeClass) {
       this.tableSchema = tableSchema;
       this.reader = reader;
       this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, 
hadoopConf);
@@ -661,6 +669,7 @@ public class MergeOnReadInputFormat
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
       this.projection = RowDataProjection.instance(requiredRowType, 
requiredPos);
       this.instantRange = split.getInstantRange().orElse(null);
+      this.merge = HoodieRecordUtils.loadMerge(mergeClass);
     }
 
     @Override
@@ -751,7 +760,8 @@ public class MergeOnReadInputFormat
         String curKey) throws IOException {
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) 
scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) 
rowDataToAvroConverter.convert(tableSchema, curRow);
-      return record.getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema);
+      Option<HoodieRecord> resultRecord = merge.combineAndGetUpdateValue(new 
HoodieAvroIndexedRecord(historyAvroRecord), record, tableSchema, new 
Properties());
+      return ((HoodieAvroIndexedRecord) resultRecord.get()).toIndexedRecord();
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 36dfecbb79..bbb21db7f8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -41,6 +41,7 @@ public class MergeOnReadTableState implements Serializable {
   private final List<MergeOnReadInputSplit> inputSplits;
   private final String[] pkFields;
   private final int operationPos;
+  private final String mergeClass;
 
   public MergeOnReadTableState(
       RowType rowType,
@@ -48,7 +49,8 @@ public class MergeOnReadTableState implements Serializable {
       String avroSchema,
       String requiredAvroSchema,
       List<MergeOnReadInputSplit> inputSplits,
-      String[] pkFields) {
+      String[] pkFields,
+      String mergeClass) {
     this.rowType = rowType;
     this.requiredRowType = requiredRowType;
     this.avroSchema = avroSchema;
@@ -56,6 +58,7 @@ public class MergeOnReadTableState implements Serializable {
     this.inputSplits = inputSplits;
     this.pkFields = pkFields;
     this.operationPos = 
rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
+    this.mergeClass = mergeClass;
   }
 
   public RowType getRowType() {
@@ -82,6 +85,10 @@ public class MergeOnReadTableState implements Serializable {
     return operationPos;
   }
 
+  public String getMergeClass() {
+    return mergeClass;
+  }
+
   public int[] getRequiredPositions() {
     final List<String> fieldNames = rowType.getFieldNames();
     return requiredRowType.getFieldNames().stream()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4e819ecd7b..4e481ab602 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -197,6 +197,7 @@ public class StreamerUtil {
                 
.archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), 
conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
                 .build())
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                .withMergeClass(conf.getString(FlinkOptions.MERGE_CLASS_NAME))
                 
.withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
                 .withInlineCompactionTriggerStrategy(
                     
CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
@@ -304,6 +305,7 @@ public class StreamerUtil {
           .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
           .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, 
null))
           .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+          .setMergeClassName(conf.getString(FlinkOptions.MERGE_CLASS_NAME))
           .setPreCombineField(OptionsResolver.getPreCombineField(conf))
           .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
           
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 63d5c1f6bd..f2095c1844 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -263,7 +263,8 @@ public class TestStreamReadOperator {
         tableAvroSchema.toString(),
         
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
         Collections.emptyList(),
-        new String[0]);
+        new String[0],
+        metaClient.getTableConfig().getMergeClass());
     MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
         .config(conf)
         .tableState(hoodieTableState)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index d0cf143318..1f51113e13 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -733,8 +733,8 @@ public class TestData {
         if (scanner != null) {
           for (String curKey : scanner.getRecords().keySet()) {
             if (!keyToSkip.contains(curKey)) {
-              Option<GenericRecord> record = (Option<GenericRecord>) 
scanner.getRecords()
-                      .get(curKey).getData()
+              Option<GenericRecord> record = (Option<GenericRecord>) 
((HoodieAvroRecord) scanner.getRecords()
+                      .get(curKey)).getData()
                       .getInsertValue(schema, config.getProps());
               if (record.isPresent()) {
                 readBuffer.add(filterOutVariables(record.get()));
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
index 8ad1c3412e..f7918cf3fd 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
@@ -21,7 +21,7 @@ package org.apache.hudi.internal;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 16f52f33b1..0e95ce8973 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -301,6 +301,12 @@ object DataSourceWriteOptions {
    */
   val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME
 
+  /**
+   * HoodieMerge will replace the payload to process the merge of data
+   * and provide the same capabilities as the payload
+   */
+  val MERGE_CLASS_NAME = HoodieWriteConfig.MERGE_CLASS_NAME
+
   /**
    * Record key field. Value to be used as the `recordKey` component of 
`HoodieKey`. Actual value
    * will be obtained by invoking .toString() on the field value. Nested 
fields can be specified using
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index e28d36d4c4..3c3e01ce78 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -40,9 +40,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
-import org.apache.hudi.io.storage.HoodieHFileReader
 import org.apache.hudi.io.storage.HoodieAvroHFileReader
-import org.apache.spark.SerializableWritable
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -74,7 +72,8 @@ case class HoodieTableState(tablePath: String,
                             preCombineFieldOpt: Option[String],
                             usesVirtualKeys: Boolean,
                             recordPayloadClassName: String,
-                            metadataConfig: HoodieMetadataConfig)
+                            metadataConfig: HoodieMetadataConfig,
+                            mergeClass: String)
 
 /**
  * Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -458,7 +457,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
       preCombineFieldOpt = preCombineFieldOpt,
       usesVirtualKeys = !tableConfig.populateMetaFields(),
       recordPayloadClassName = tableConfig.getPayloadClass,
-      metadataConfig = fileIndex.metadataConfig
+      metadataConfig = fileIndex.metadataConfig,
+      mergeClass = tableConfig.getMergeClass
     )
   }
 
@@ -737,7 +737,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
 
       reader.getRecordIterator(requiredAvroSchema).asScala
         .map(record => {
-          avroToRowConverter.apply(record).get
+          avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get
         })
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 927dbcfba7..d0032088a6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -19,7 +19,7 @@
 package org.apache.hudi
 
 import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
@@ -31,8 +31,9 @@ import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig}
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
-import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, 
HoodieRecordPayload, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, 
HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.common.util.HoodieRecordUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodiePayloadConfig
 import org.apache.hudi.exception.HoodieException
@@ -203,10 +204,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
       logRecords.iterator.map {
         case (_, record) =>
-          val avroRecordOpt = 
toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, payloadProps))
-          avroRecordOpt.map {
-            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, 
requiredSchemaFieldOrdinals, recordBuilder)
-          }
+          
toScalaOption(record.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(logFileReaderAvroSchema,
 payloadProps))
+            .map(_.asInstanceOf[GenericRecord])
       }
 
     protected def removeLogRecord(key: String): Option[HoodieRecord[_]] =
@@ -295,6 +294,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
     private val baseFileIterator = baseFileReader(split.dataFile.get)
 
+    private val merger = HoodieRecordUtils.loadMerge(tableState.mergeClass)
+
     override def hasNext: Boolean = hasNextInternal
 
     // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
@@ -310,12 +311,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
           recordToLoad = requiredSchemaUnsafeProjection(curRow)
           true
         } else {
-          val mergedAvroRecordOpt = merge(serialize(curRowRecord), 
updatedRecordOpt.get)
+          val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
           if (mergedAvroRecordOpt.isEmpty) {
             // Record has been deleted, skipping
             this.hasNextInternal
           } else {
-            val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
+            val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.getData.asInstanceOf[GenericRecord],
               requiredAvroSchema, reusableRecordBuilder)
             recordToLoad = deserialize(projectedAvroRecord)
             true
@@ -329,10 +330,10 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     private def serialize(curRowRecord: InternalRow): GenericRecord =
       serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+    private def merge(curAvroRecord: GenericRecord, newRecord: 
HoodieRecord[_]): Option[HoodieRecord[_]] = {
       // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
       //       on the record from the Delta Log
-      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
+      toScalaOption(merger.combineAndGetUpdateValue(new 
HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema, 
payloadProps))
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3fa18b122a..b935ce1b13 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -150,6 +150,7 @@ object HoodieSparkSqlWriter {
           .setBaseFileFormat(baseFileFormat)
           .setArchiveLogFolder(archiveLogFolder)
           .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+          .setMergeClassName(hoodieConfig.getString(MERGE_CLASS_NAME))
           // we can't fetch preCombine field from hoodieConfig object, since 
it falls back to "ts" as default value,
           // but we are interested in what user has set, hence fetching from 
optParams.
           .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), 
null))
@@ -484,6 +485,7 @@ object HoodieSparkSqlWriter {
           .setRecordKeyFields(recordKeyFields)
           .setArchiveLogFolder(archiveLogFolder)
           
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
+          .setMergeClassName(hoodieConfig.getStringOrDefault(MERGE_CLASS_NAME))
           
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
           .setBootstrapIndexClass(bootstrapIndexClass)
           .setBaseFileFormat(baseFileFormat)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala
new file mode 100644
index 0000000000..4ff5cceef7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import org.apache.avro.Schema
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, fromJavaDate, 
toJavaDate}
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.hudi.common.util.ValidationUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
JoinedRow, MutableProjection, Projection}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import 
org.apache.spark.sql.hudi.ColumnStatsExpressionUtils.AllowedTransformationExpression.exprUtils.generateMutableProjection
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * Helper class to do common stuff across Spark InternalRow.
+ * Provides common methods similar to {@link HoodieAvroUtils}.
+ */
+object HoodieInternalRowUtils {
+
+  val projectionMap = new ConcurrentHashMap[(StructType, StructType), 
MutableProjection]
+  val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  val SchemaPosMap = new ConcurrentHashMap[StructType, Map[String, 
(StructField, Int)]]
+
+  def stitchRecords(left: InternalRow, leftSchema: StructType, right: 
InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow 
= {
+    val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields)
+    val row = new JoinedRow(left, right)
+    val projection = getCachedProjection(mergeSchema, stitchedSchema)
+    projection(row)
+  }
+
+  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: 
StructType): InternalRow = {
+    val newRow = new 
GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
+
+    val oldFieldMap = getCachedSchemaPosMap(oldSchema)
+    for ((field, pos) <- newSchema.fields.zipWithIndex) {
+      var oldValue: AnyRef = null
+      if (oldFieldMap.contains(field.name)) {
+        val (oldField, oldPos) = oldFieldMap(field.name)
+        oldValue = oldRecord.get(oldPos, oldField.dataType)
+      }
+      if (oldValue != null) {
+        field.dataType match {
+          case structType: StructType =>
+            val oldField = oldFieldMap(field.name)._1.asInstanceOf[StructType]
+            rewriteRecord(oldValue.asInstanceOf[InternalRow], oldField, 
structType)
+          case decimalType: DecimalType =>
+            val oldField = oldFieldMap(field.name)._1.asInstanceOf[DecimalType]
+            if (decimalType.scale != oldField.scale || decimalType.precision 
!= oldField.precision) {
+              newRow.update(pos, 
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
+              )
+            } else {
+              newRow.update(pos, oldValue)
+            }
+          case _ =>
+            newRow.update(pos, oldValue)
+        }
+      } else {
+        // TODO default value in newSchema
+      }
+    }
+
+    newRow
+  }
+
+  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: 
StructType, newSchema: StructType, renameCols: util.Map[String, String]): 
InternalRow = {
+    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, 
new util.LinkedList[String]).asInstanceOf[InternalRow]
+  }
+
+  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, 
newSchema: DataType, renameCols: util.Map[String, String], fieldNames: 
util.Deque[String]): Any = {
+    if (oldRecord == null) {
+      null
+    } else {
+      newSchema match {
+        case targetSchema: StructType =>
+          ValidationUtils.checkArgument(oldRecord.isInstanceOf[InternalRow], 
"cannot rewrite record with different type")
+          val oldRow = oldRecord.asInstanceOf[InternalRow]
+          val helper = mutable.Map[Integer, Any]()
+
+          val oldSchemaPos = 
getCachedSchemaPosMap(oldSchema.asInstanceOf[StructType])
+          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
+            fieldNames.push(field.name)
+            if (oldSchemaPos.contains(field.name)) {
+              val (oldField, oldPos) = oldSchemaPos(field.name)
+              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, 
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
+            } else {
+              val fieldFullName = createFullName(fieldNames)
+              val colNamePartsFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "").split("\\.")
+              val lastColNameFromOldSchema = 
colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
+              // deal with rename
+              if (!oldSchemaPos.contains(field.name) && 
oldSchemaPos.contains(lastColNameFromOldSchema)) {
+                // find rename
+                val (oldField, oldPos) = oldSchemaPos(lastColNameFromOldSchema)
+                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, 
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
+              }
+            }
+            fieldNames.pop()
+          }
+          val newRow = new 
GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
+          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
+            if (helper.contains(i)) {
+              newRow.update(i, helper(i))
+            } else {
+              // TODO add default val
+              newRow.update(i, null)
+            }
+          }
+
+          newRow
+        case targetSchema: ArrayType =>
+          ValidationUtils.checkArgument(oldRecord.isInstanceOf[ArrayData], 
"cannot rewrite record with different type")
+          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
+          val oldArray = oldRecord.asInstanceOf[ArrayData]
+          val newElementType = targetSchema.elementType
+          val newArray = new 
GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
+          fieldNames.push("element")
+          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case 
(value, i) => newArray.update(i, 
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, 
newElementType, renameCols, fieldNames)) }
+          fieldNames.pop()
+
+          newArray
+        case targetSchema: MapType =>
+          ValidationUtils.checkArgument(oldRecord.isInstanceOf[MapData], 
"cannot rewrite record with different type")
+          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
+          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
+          val oldMap = oldRecord.asInstanceOf[MapData]
+          val newValueType = targetSchema.valueType
+          val newKeyArray = new 
GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
+          val newValueArray = new 
GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
+          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
+          fieldNames.push("value")
+          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case 
(value, i) => newKeyArray.update(i, value) }
+          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { 
case (value, i) => newValueArray.update(i, 
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, 
newValueType, renameCols, fieldNames)) }
+          fieldNames.pop()
+
+          newMap
+        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
+      }
+    }
+  }
+
+  def rewriteRecordWithMetadata(record: InternalRow, oldSchema: StructType, 
newSchema: StructType, fileName: String): InternalRow = {
+    val newRecord = rewriteRecord(record, oldSchema, newSchema)
+    newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, 
fileName)
+
+    newRecord
+  }
+
+  def rewriteEvolutionRecordWithMetadata(record: InternalRow, oldSchema: 
StructType, newSchema: StructType, fileName: String): InternalRow = {
+    val newRecord = rewriteRecordWithNewSchema(record, oldSchema, newSchema, 
new util.HashMap[String, String]())
+    newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, 
fileName)
+
+    newRecord
+  }
+
+  def getCachedSchema(schema: Schema): StructType = {
+    if (!schemaMap.contains(schema)) {
+      schemaMap.synchronized {
+        if (!schemaMap.contains(schema)) {
+          val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+          schemaMap.put(schema, structType)
+        }
+      }
+    }
+    schemaMap.get(schema)
+  }
+
+  private def getCachedProjection(from: StructType, to: StructType): 
Projection = {
+    val schemaPair = (from, to)
+    if (!projectionMap.contains(schemaPair)) {
+      projectionMap.synchronized {
+        if (!projectionMap.contains(schemaPair)) {
+          val projection = generateMutableProjection(from, to)
+          projectionMap.put(schemaPair, projection)
+        }
+      }
+    }
+    projectionMap.get(schemaPair)
+  }
+
+  def getCachedSchemaPosMap(schema: StructType): Map[String, (StructField, 
Int)] = {
+    if (!SchemaPosMap.contains(schema)) {
+      SchemaPosMap.synchronized {
+        if (!SchemaPosMap.contains(schema)) {
+          val fieldMap = schema.fields.zipWithIndex.map { case (field, i) => 
(field.name, (field, i)) }.toMap
+          SchemaPosMap.put(schema, fieldMap)
+        }
+      }
+    }
+    SchemaPosMap.get(schema)
+  }
+
+  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, 
newSchema: DataType): Any = {
+    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && 
newSchema.isInstanceOf[DecimalType])) {
+      oldSchema match {
+        case NullType | BooleanType | IntegerType | LongType | FloatType | 
DoubleType | StringType | DateType | TimestampType | BinaryType =>
+          oldValue
+        case DecimalType() =>
+          
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
+        case _ =>
+          throw new HoodieException("Unknown schema type: " + newSchema)
+      }
+    } else {
+      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
+    }
+  }
+
+  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: 
DataType, newSchema: DataType): Any = {
+    val value = newSchema match {
+      case NullType | BooleanType =>
+      case DateType if oldSchema.equals(StringType) =>
+        fromJavaDate(java.sql.Date.valueOf(oldValue.toString))
+      case LongType =>
+        oldSchema match {
+          case IntegerType => oldValue.asInstanceOf[Int].longValue()
+          case _ =>
+        }
+      case FloatType =>
+        oldSchema match {
+          case IntegerType => oldValue.asInstanceOf[Int].floatValue()
+          case LongType => oldValue.asInstanceOf[Long].floatValue()
+          case _ =>
+        }
+      case DoubleType =>
+        oldSchema match {
+          case IntegerType => oldValue.asInstanceOf[Int].doubleValue()
+          case LongType => oldValue.asInstanceOf[Long].doubleValue()
+          case FloatType => 
java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + "")
+          case _ =>
+        }
+      case BinaryType =>
+        oldSchema match {
+          case StringType => 
oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8)
+          case _ =>
+        }
+      case StringType =>
+        oldSchema match {
+          case BinaryType => new String(oldValue.asInstanceOf[Array[Byte]])
+          case DateType => toJavaDate(oldValue.asInstanceOf[Integer]).toString
+          case IntegerType | LongType | FloatType | DoubleType | DecimalType() 
=> oldValue.toString
+          case _ =>
+        }
+      case DecimalType() =>
+        oldSchema match {
+          case IntegerType | LongType | FloatType | DoubleType | StringType =>
+            val scale = newSchema.asInstanceOf[DecimalType].scale
+
+            Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+          case _ =>
+        }
+      case _ =>
+    }
+    if (value == None) {
+      throw new HoodieException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
+    } else {
+      value
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java
new file mode 100644
index 0000000000..a22e78af21
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, Comparable 
orderingVal) {
+    super(key, data, orderingVal);
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation 
operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+  }
+
+  public HoodieSparkRecord(HoodieRecord<InternalRow> record) {
+    super(record);
+  }
+
+  public HoodieSparkRecord() {
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation 
op) {
+    return new HoodieSparkRecord(key, data, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, getOrderingValue());
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    return getRecordKey();
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    return getRecordKey();
+  }
+
+  @Override
+  public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, 
Schema writerSchema) throws IOException {
+    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(readerSchema);
+    StructType writerStructType = 
HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, 
readerStructType, (InternalRow) other.getData(), readerStructType, 
writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, 
TypedProperties props) throws IOException {
+    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType targetStructType = 
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, 
readerStructType, targetStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, 
boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws 
IOException {
+    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType writeSchemaWithMetaFieldsStructType = 
HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? 
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, 
writeSchemaWithMetaFieldsStructType, new HashMap<>())
+        : HoodieInternalRowUtils.rewriteRecord(data, readerStructType, 
writeSchemaWithMetaFieldsStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, 
Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, 
String fileName) throws IOException {
+    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType writeSchemaWithMetaFieldsStructType = 
HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? 
HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, 
readerStructType, writeSchemaWithMetaFieldsStructType, fileName)
+        : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, 
readerStructType, writeSchemaWithMetaFieldsStructType, fileName);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties prop, Schema newSchema, Map<String, String> renameCols) throws 
IOException {
+    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType newStructType = 
HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = 
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, 
newStructType, renameCols);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper 
mapper) throws IOException {
+    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType newStructType = 
HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = 
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, 
newStructType, renameCols);
+    // TODO change mapper type
+    return mapper.apply((IndexedRecord) rewriteRow);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties prop, Schema newSchema) throws IOException {
+    StructType readerStructType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType newStructType = 
HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, 
readerStructType, newStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, 
Properties prop, int pos, String newValue) throws IOException {
+    data.update(pos, newValue);
+    return this;
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, 
Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        data.update(recordSchema.getField(metadataField.getFieldName()).pos(), 
value);
+      }
+    });
+    return this;
+  }
+
+  @Override
+  public Option<Map<String, String>> getMetadata() {
+    return Option.empty();
+  }
+
+  @Override
+  public boolean isPresent(Schema schema, Properties prop) throws IOException {
+    if (null == data) {
+      return false;
+    }
+    Object deleteMarker = 
data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), 
BooleanType);
+    return !(deleteMarker instanceof Boolean && (boolean) deleteMarker);
+  }
+
+  @Override
+  public boolean shouldIgnore(Schema schema, Properties prop) throws 
IOException {
+    // TODO SENTINEL should refactor without Avro(GenericRecord)
+    if (null != data && data.equals(SENTINEL)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Option<IndexedRecord> toIndexedRecord(Schema schema, Properties prop) 
throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
new file mode 100644
index 0000000000..88ae7c13df
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HoodieSparkRecordMerge implements HoodieMerge {
+
+  @Override
+  public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
+    if (older.getData() == null) {
+      // use natural order for delete record
+      return older;
+    }
+    if (older.getOrderingValue().compareTo(newer.getOrderingValue()) > 0) {
+      return older;
+    } else {
+      return newer;
+    }
+  }
+
+  @Override
+  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException {
+    return Option.of(newer);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index 1ed0e5e1a4..2421face72 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -48,7 +48,7 @@ object SparkHelpers {
     // Add current classLoad for config, if not will throw classNotFound of 
'HoodieWrapperFileSystem'.
     
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
 
-    val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, 
parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
+    val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, 
instantTime, new SparkTaskContextSupplier(), true)
     for (rec <- sourceRecords) {
       val key: String = 
rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
       if (!keysToSkip.contains(key)) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
new file mode 100644
index 0000000000..7a08ee64bf
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieInternalRowUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class TestHoodieInternalRowUtils extends FunSuite with Matchers with 
BeforeAndAfterAll {
+
+  private var sparkSession: SparkSession = _
+
+  private val schema1 = StructType(
+    Array(
+      StructField("name", StringType),
+      StructField("age", IntegerType)
+    )
+  )
+  private val schema2 = StructType(
+    Array(
+      StructField("name1", StringType),
+      StructField("age1", IntegerType)
+    )
+  )
+  private val schemaMerge = StructType(schema1.fields ++ schema2.fields)
+  private val schema1WithMetaData = StructType(Array(
+    StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.OPERATION_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.HOODIE_IS_DELETED_FIELD, BooleanType)
+  ) ++ schema1.fields)
+
+  override protected def beforeAll(): Unit = {
+    // Initialize a local spark env
+    val jsc = new 
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName))
+    jsc.setLogLevel("ERROR")
+    sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate
+  }
+
+  override protected def afterAll(): Unit = {
+    sparkSession.close()
+  }
+
+  test("test merge") {
+    val data1 = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val data2 = sparkSession.sparkContext.parallelize(Seq(Row("like1", 181)))
+    val row1 = sparkSession.createDataFrame(data1, 
schema1).queryExecution.toRdd.first()
+    val row2 = sparkSession.createDataFrame(data2, 
schema2).queryExecution.toRdd.first()
+    val rowMerge = HoodieInternalRowUtils.stitchRecords(row1, schema1, row2, 
schema2, schemaMerge)
+    assert(rowMerge.get(0, StringType).toString.equals("like"))
+    assert(rowMerge.get(1, IntegerType) == 18)
+    assert(rowMerge.get(2, StringType).toString.equals("like1"))
+    assert(rowMerge.get(3, IntegerType) == 181)
+  }
+
+  test("test rewrite") {
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, 
"like1", 181)))
+    val oldRow = sparkSession.createDataFrame(data, 
schemaMerge).queryExecution.toRdd.first()
+    val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, 
schema1)
+    val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, 
schema2)
+    assert(newRow1.get(0, StringType).toString.equals("like"))
+    assert(newRow1.get(1, IntegerType) == 18)
+    assert(newRow2.get(0, StringType).toString.equals("like1"))
+    assert(newRow2.get(1, IntegerType) == 181)
+  }
+
+  test("test rewrite with nullable value") {
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val oldRow = sparkSession.createDataFrame(data, 
schema1).queryExecution.toRdd.first()
+    val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, 
schemaMerge)
+    assert(newRow.get(0, StringType).toString.equals("like"))
+    assert(newRow.get(1, IntegerType) == 18)
+    assert(newRow.get(2, StringType) == null)
+    assert(newRow.get(3, IntegerType) == null)
+  }
+
+  test("test rewrite with metaDataFiled value") {
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val oldRow = sparkSession.createDataFrame(data, 
schema1).queryExecution.toRdd.first()
+    val newRow = HoodieInternalRowUtils.rewriteRecordWithMetadata(oldRow, 
schema1, schema1WithMetaData, "file1")
+    assert(newRow.get(0, StringType) == null)
+    assert(newRow.get(1, StringType) == null)
+    assert(newRow.get(2, StringType) == null)
+    assert(newRow.get(3, StringType) == null)
+    assert(newRow.get(4, StringType).toString.equals("file1"))
+    assert(newRow.get(5, StringType) == null)
+    assert(newRow.get(6, BooleanType) == null)
+    assert(newRow.get(7, StringType).toString.equals("like"))
+    assert(newRow.get(8, IntegerType) == 18)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
new file mode 100644
index 0000000000..cb5529721c
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.nio.ByteBuffer
+import java.util.{ArrayList, HashMap, Objects}
+import org.apache.avro.generic.GenericData
+import org.apache.avro.{LogicalTypes, Schema}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.internal.schema.action.TableChanges
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.hudi.HoodieInternalRowUtils
+import org.apache.spark.sql.types._
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with 
BeforeAndAfterAll {
+  private var sparkSession: SparkSession = _
+
+  override protected def beforeAll(): Unit = {
+    // Initialize a local spark env
+    val jsc = new 
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName))
+    jsc.setLogLevel("ERROR")
+    sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate
+  }
+
+  override protected def afterAll(): Unit = {
+    sparkSession.close()
+  }
+
+  /**
+   * test record data type changes.
+   * int => long/float/double/string
+   * long => float/double/string
+   * float => double/String
+   * double => String/Decimal
+   * Decimal => Decimal/String
+   * String => date/decimal
+   * date => String
+   */
+  test("test rewrite record with type changed") {
+    val avroSchema = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+      + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"comb\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"com1\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"col0\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col11\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col12\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null},"
+      + "{\"name\":\"col21\",\"type\":[\"null\",\"float\"],\"default\":null},"
+      + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null},"
+      + "{\"name\":\"col31\",\"type\":[\"null\",\"double\"],\"default\":null},"
+      + 
"{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\","
+      + 
"\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+      + 
"{\"name\":\"col41\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col41\","
+      + 
"\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+      + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + "{\"name\":\"col51\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + 
"{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
+      + 
"{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null},"
+      + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null},"
+      + 
"{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}")
+    // create a test record with avroSchema
+    val avroRecord = new GenericData.Record(avroSchema)
+    avroRecord.put("id", 1)
+    avroRecord.put("comb", 100)
+    avroRecord.put("com1", -100)
+    avroRecord.put("col0", 256)
+    avroRecord.put("col1", 1000L)
+    avroRecord.put("col11", -100L)
+    avroRecord.put("col12", 2000L)
+    avroRecord.put("col2", -5.001f)
+    avroRecord.put("col21", 5.001f)
+    avroRecord.put("col3", 12.999d)
+    avroRecord.put("col31", 9999.999d)
+    val currentDecimalType = avroSchema.getField("col4").schema.getTypes.get(1)
+    val bd = new 
java.math.BigDecimal("123.456").setScale(currentDecimalType.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale)
+    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
currentDecimalType, currentDecimalType.getLogicalType))
+    val currentDecimalType1 = 
avroSchema.getField("col41").schema.getTypes.get(1)
+    val bd1 = new 
java.math.BigDecimal("7890.456").setScale(currentDecimalType1.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale)
+    avroRecord.put("col41", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd1, 
currentDecimalType1, currentDecimalType1.getLogicalType))
+    avroRecord.put("col5", "2011-01-01")
+    avroRecord.put("col51", "199.342")
+    avroRecord.put("col6", 18987)
+    avroRecord.put("col7", 1640491505000000L)
+    avroRecord.put("col8", false)
+    val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53))
+    avroRecord.put("col9", bb)
+    assert(GenericData.get.validate(avroSchema, avroRecord))
+    val internalSchema = AvroInternalSchemaConverter.convert(avroSchema)
+    // do change type operation
+    val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema)
+    updateChange.updateColumnType("id", 
Types.LongType.get).updateColumnType("comb", 
Types.FloatType.get).updateColumnType("com1", 
Types.DoubleType.get).updateColumnType("col0", 
Types.StringType.get).updateColumnType("col1", 
Types.FloatType.get).updateColumnType("col11", 
Types.DoubleType.get).updateColumnType("col12", 
Types.StringType.get).updateColumnType("col2", 
Types.DoubleType.get).updateColumnType("col21", 
Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateCo 
[...]
+    val newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, 
updateChange)
+    val newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, 
avroSchema.getName)
+    val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, 
newAvroSchema, new HashMap[String, String])
+    assert(GenericData.get.validate(newAvroSchema, newRecord))
+    // Convert avro to internalRow
+    val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(avroSchema)
+    val newStructTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+    val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, 
structTypeSchema).apply(avroRecord).get
+    val newRowExpected = 
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, 
newStructTypeSchema)
+      .apply(newRecord).get
+    val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, 
structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+    internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+  }
+
+  test("test rewrite nest record") {
+    val record = Types.RecordType.get(Types.Field.get(0, false, "id", 
Types.IntType.get()),
+      Types.Field.get(1, true, "data", Types.StringType.get()),
+      Types.Field.get(2, true, "preferences",
+        Types.RecordType.get(Types.Field.get(5, false, "feature1",
+          Types.BooleanType.get()), Types.Field.get(6, true, "feature2", 
Types.BooleanType.get()))),
+      Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, 
Types.DoubleType.get())),
+      Types.Field.get(4, false, "locations", Types.MapType.get(8, 9, 
Types.StringType.get(),
+        Types.RecordType.get(Types.Field.get(10, false, "lat", 
Types.FloatType.get()), Types.Field.get(11, false, "long", 
Types.FloatType.get())), false))
+    )
+    val schema = AvroInternalSchemaConverter.convert(record, "test1")
+    val avroRecord = new GenericData.Record(schema)
+    GenericData.get.validate(schema, avroRecord)
+    avroRecord.put("id", 2)
+    avroRecord.put("data", "xs")
+    // fill record type
+    val preferencesRecord = new 
GenericData.Record(AvroInternalSchemaConverter.convert(record.fieldType("preferences"),
 "test1_preferences"))
+    preferencesRecord.put("feature1", false)
+    preferencesRecord.put("feature2", true)
+    
assert(GenericData.get.validate(AvroInternalSchemaConverter.convert(record.fieldType("preferences"),
 "test1_preferences"), preferencesRecord))
+    avroRecord.put("preferences", preferencesRecord)
+    // fill mapType
+    val locations = new HashMap[String, GenericData.Record]
+    val mapSchema = 
AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType,
 "test1_locations")
+    val locationsValue: GenericData.Record = new GenericData.Record(mapSchema)
+    locationsValue.put("lat", 1.2f)
+    locationsValue.put("long", 1.4f)
+    val locationsValue1: GenericData.Record = new GenericData.Record(mapSchema)
+    locationsValue1.put("lat", 2.2f)
+    locationsValue1.put("long", 2.4f)
+    locations.put("key1", locationsValue)
+    locations.put("key2", locationsValue1)
+    avroRecord.put("locations", locations)
+    val doubles = new ArrayList[Double]
+    doubles.add(2.0d)
+    doubles.add(3.0d)
+    avroRecord.put("doubles", doubles)
+    // do check
+    assert(GenericData.get.validate(schema, avroRecord))
+    // create newSchema
+    val newRecord = Types.RecordType.get(Types.Field.get(0, false, "id", 
Types.IntType.get), Types.Field.get(1, true, "data", Types.StringType.get), 
Types.Field.get(2, true, "preferences", Types.RecordType.get(Types.Field.get(5, 
false, "feature1", Types.BooleanType.get), Types.Field.get(5, true, "featurex", 
Types.BooleanType.get), Types.Field.get(6, true, "feature2", 
Types.BooleanType.get))), Types.Field.get(3, false, "doubles", 
Types.ArrayType.get(7, false, Types.DoubleType.get)), Types [...]
+    val newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, 
schema.getName)
+    val newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, 
newAvroSchema, new HashMap[String, String])
+    // test the correctly of rewrite
+    assert(GenericData.get.validate(newAvroSchema, newAvroRecord))
+    // Convert avro to internalRow
+    val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(schema)
+    val newStructTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+    val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, 
structTypeSchema).apply(avroRecord).get
+    val newRowExpected = 
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, 
newStructTypeSchema).apply(newAvroRecord).get
+    val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, 
structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+    internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+  }
+
+  private def internalRowCompare(expected: Any, actual: Any, schema: 
DataType): Unit = {
+    schema match {
+      case StructType(fields) =>
+        val expectedRow = expected.asInstanceOf[InternalRow]
+        val actualRow = actual.asInstanceOf[InternalRow]
+        fields.zipWithIndex.foreach { case (field, i) => 
internalRowCompare(expectedRow.get(i, field.dataType), actualRow.get(i, 
field.dataType), field.dataType) }
+      case ArrayType(elementType, _) =>
+        val expectedArray = 
expected.asInstanceOf[ArrayData].toSeq[Any](elementType)
+        val actualArray = 
actual.asInstanceOf[ArrayData].toSeq[Any](elementType)
+        if (expectedArray.size != actualArray.size) {
+          throw new AssertionError()
+        } else {
+          expectedArray.zip(actualArray).foreach { case (e1, e2) => 
internalRowCompare(e1, e2, elementType) }
+        }
+      case MapType(keyType, valueType, _) =>
+        val expectedKeyArray = expected.asInstanceOf[MapData].keyArray()
+        val expectedValueArray = expected.asInstanceOf[MapData].valueArray()
+        val actualKeyArray = actual.asInstanceOf[MapData].keyArray()
+        val actualValueArray = actual.asInstanceOf[MapData].valueArray()
+        internalRowCompare(expectedKeyArray, actualKeyArray, 
ArrayType(keyType))
+        internalRowCompare(expectedValueArray, actualValueArray, 
ArrayType(valueType))
+      case StringType => if (checkNull(expected, actual) || 
!expected.toString.equals(actual.toString)) {
+        throw new AssertionError(String.format("%s is not equals %s", 
expected.toString, actual.toString))
+      }
+      // TODO Verify after 'https://github.com/apache/hudi/pull/5907' merge
+      case BinaryType => if (checkNull(expected, actual) || 
!expected.asInstanceOf[Array[Byte]].sameElements(actual.asInstanceOf[Array[Byte]]))
 {
+      // throw new AssertionError(String.format("%s is not equals %s", 
expected.toString, actual.toString))
+      }
+      case _ => if (!Objects.equals(expected, actual)) {
+      // throw new AssertionError(String.format("%s is not equals %s", 
expected.toString, actual.toString))
+      }
+    }
+  }
+
+  private def checkNull(left: Any, right: Any): Boolean = {
+    (left == null && right != null) || (left == null && right != null)
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index badd3ab627..dbe5285e9d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -275,6 +275,7 @@ public class DeltaSync implements Serializable {
           .setTableName(cfg.targetTableName)
           .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
           .setPayloadClassName(cfg.payloadClassName)
+          .setMergeClassName(cfg.mergeClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
           .setPartitionFields(partitionColumns)
           
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
@@ -372,6 +373,7 @@ public class DeltaSync implements Serializable {
           .setTableName(cfg.targetTableName)
           .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
           .setPayloadClassName(cfg.payloadClassName)
+          .setMergeClassName(cfg.mergeClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
           .setPartitionFields(partitionColumns)
           
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a22a3581ae..dcf1581216 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -32,6 +32,7 @@ import 
org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -270,6 +271,10 @@ public class HoodieDeltaStreamer implements Serializable {
         + "a GenericRecord. Implement your own, if you want to do something 
other than overwriting existing value")
     public String payloadClassName = 
OverwriteWithLatestAvroPayload.class.getName();
 
+    @Parameter(names = {"--merge-class"}, description = "Implements of 
HoodieMerge, that defines how to merge two records."
+        + "Implement your own, if you want to implement specific record merge 
logic.")
+    public String mergeClassName = HoodieAvroRecordMerge.class.getName();
+
     @Parameter(names = {"--schemaprovider-class"}, description = "subclass of 
org.apache.hudi.utilities.schema"
         + ".SchemaProvider to attach schemas to input & target table data, 
built in options: "
         + "org.apache.hudi.utilities.schema.FilebasedSchemaProvider."

Reply via email to