This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e7fba4f22ac [HUDI-9576] Test schema evolution in fg reader (#13549)
9e7fba4f22ac is described below

commit 9e7fba4f22ac9125d15a79c28c39ed3633febcd1
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jul 18 18:41:52 2025 -0400

    [HUDI-9576] Test schema evolution in fg reader (#13549)
    
    - Test schema evolution with File group reader across engines. For spark, 
all cases are green. For other record formats, some tests are disabled, but 
follow up patches will fix them.
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../read/HoodieFileGroupReaderOnJavaTestBase.java  |  10 +-
 .../read/TestHoodieFileGroupReaderOnJava.java      |  15 +
 .../hadoop/TestHoodieFileGroupReaderOnHive.java    |  49 ++-
 .../hudi/testutils/ArrayWritableTestUtil.java      | 103 +++++++
 .../table/read/TestHoodieFileGroupReaderBase.java  | 334 ++++++++++++++++++++-
 .../common/testutils/HoodieTestDataGenerator.java  | 238 ++++++++++++++-
 .../table/TestHoodieFileGroupReaderOnFlink.java    |  52 +++-
 .../hadoop/hive/serde2/avro/HiveTypeUtils.java     | 328 ++++++++++++++++++++
 .../hudi/hadoop/utils/ObjectInspectorCache.java    |   4 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    | 119 +++++++-
 10 files changed, 1194 insertions(+), 58 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
index 6f66b6cb2d0f..fb72b328aa5a 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -60,13 +59,13 @@ public abstract class 
HoodieFileGroupReaderOnJavaTestBase<T> extends TestHoodieF
   }
 
   @Override
-  public void commitToTable(List<HoodieRecord> recordList, String operation, 
Map<String, String> writeConfigs) {
+  public void commitToTable(List<HoodieRecord> recordList, String operation, 
boolean firstCommit, Map<String, String> writeConfigs, String schemaStr) {
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
         .withEngineType(EngineType.JAVA)
         .withEmbeddedTimelineServerEnabled(false)
         .withProps(writeConfigs)
         .withPath(getBasePath())
-        .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withSchema(schemaStr)
         .build();
 
     HoodieJavaClientTestHarness.TestJavaTaskContextSupplier 
taskContextSupplier = new 
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier();
@@ -75,8 +74,7 @@ public abstract class HoodieFileGroupReaderOnJavaTestBase<T> 
extends TestHoodieF
     StoragePath basePath = new StoragePath(getBasePath());
     try (HoodieStorage storage = new HoodieHadoopStorage(basePath, 
getStorageConf())) {
       boolean basepathExists = storage.exists(basePath);
-      boolean operationIsInsert = operation.equalsIgnoreCase("insert");
-      if (!basepathExists || operationIsInsert) {
+      if (!basepathExists || firstCommit) {
         if (basepathExists) {
           storage.deleteDirectory(basePath);
         }
@@ -108,6 +106,8 @@ public abstract class 
HoodieFileGroupReaderOnJavaTestBase<T> extends TestHoodieF
       recordList.forEach(hoodieRecord -> recordsCopy.add(new 
HoodieAvroRecord<>(hoodieRecord.getKey(), (HoodieRecordPayload) 
hoodieRecord.getData())));
       if (operation.toLowerCase().equals("insert")) {
         writeClient.commit(instantTime, writeClient.insert(recordsCopy, 
instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
+      } else if (operation.toLowerCase().equals("bulkInsert")) {
+        writeClient.commit(instantTime, writeClient.bulkInsert(recordsCopy, 
instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
       } else {
         writeClient.commit(instantTime, writeClient.upsert(recordsCopy, 
instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
       }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
index 0f3e1ad59329..bdcf4f54009f 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
@@ -19,9 +19,11 @@
 
 package org.apache.hudi.common.table.read;
 
+import org.apache.hudi.avro.ConvertingGenericData;
 import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
@@ -30,6 +32,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieFileGroupReaderOnJava extends 
HoodieFileGroupReaderOnJavaTestBase<IndexedRecord> {
   private static final StorageConfiguration<?> STORAGE_CONFIGURATION = new 
HadoopStorageConfiguration(false);
@@ -49,4 +52,16 @@ public class TestHoodieFileGroupReaderOnJava extends 
HoodieFileGroupReaderOnJava
   public void assertRecordsEqual(Schema schema, IndexedRecord expected, 
IndexedRecord actual) {
     assertEquals(expected, actual);
   }
+
+  @Override
+  public void assertRecordMatchesSchema(Schema schema, IndexedRecord record) {
+    assertTrue(ConvertingGenericData.INSTANCE.validate(schema, record));
+  }
+
+  @Override
+  public HoodieTestDataGenerator.SchemaEvolutionConfigs 
getSchemaEvolutionConfigs() {
+    HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new 
HoodieTestDataGenerator.SchemaEvolutionConfigs();
+    configs.addNewFieldSupport = false;
+    return configs;
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index c01ed5a7a33e..5206a3fa1d1d 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -38,8 +38,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.jupiter.api.AfterAll;
@@ -47,7 +48,7 @@ import org.junit.jupiter.api.BeforeAll;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
+import java.util.Locale;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader.getStoredPartitionFieldNames;
@@ -105,23 +106,53 @@ public class TestHoodieFileGroupReaderOnHive extends 
HoodieFileGroupReaderOnJava
     ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual, 
false);
   }
 
+  @Override
+  public void assertRecordMatchesSchema(Schema schema, ArrayWritable record) {
+    ArrayWritableTestUtil.assertArrayWritableMatchesSchema(schema, record);
+  }
+
+  @Override
+  public HoodieTestDataGenerator.SchemaEvolutionConfigs 
getSchemaEvolutionConfigs() {
+    HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new 
HoodieTestDataGenerator.SchemaEvolutionConfigs();
+    configs.nestedSupport = false;
+    configs.arraySupport = false;
+    configs.mapSupport = false;
+    configs.addNewFieldSupport = false;
+    configs.intToLongSupport = false;
+    configs.intToFloatSupport = false;
+    configs.intToDoubleSupport = false;
+    configs.intToStringSupport = false;
+    configs.longToFloatSupport = false;
+    configs.longToDoubleSupport = false;
+    configs.longToStringSupport = false;
+    configs.floatToDoubleSupport = false;
+    configs.floatToStringSupport = false;
+    configs.doubleToStringSupport = false;
+    configs.stringToBytesSupport = false;
+    configs.bytesToStringSupport = false;
+    return configs;
+  }
+
   private void setupJobconf(JobConf jobConf, Schema schema) {
     List<Schema.Field> fields = schema.getFields();
     setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
-    List<TypeInfo> types = 
TypeInfoUtils.getTypeInfosFromTypeString(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
-    Map<String, String> typeMappings = 
HoodieTestDataGenerator.AVRO_SCHEMA.getFields().stream().collect(Collectors.toMap(Schema.Field::name,
 field -> types.get(field.pos()).getTypeName()));
-    String columnTypes = fields.stream().map(field -> 
typeMappings.getOrDefault(field.name(), 
"string")).collect(Collectors.joining(","));
-    jobConf.set("columns.types", columnTypes + ",string");
+    try {
+      String columnTypes = 
HiveTypeUtils.generateColumnTypes(schema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
+      jobConf.set("columns.types", columnTypes + ",string");
+    } catch (SerDeException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf 
jobConf, boolean isPartitioned) {
-    String names = 
fields.stream().map(Schema.Field::name).collect(Collectors.joining(","));
+    String names = fields.stream().map(Schema.Field::name).map(s -> 
s.toLowerCase(Locale.ROOT)).collect(Collectors.joining(","));
     String positions = fields.stream().map(f -> 
String.valueOf(f.pos())).collect(Collectors.joining(","));
     jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
     jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
 
-    String hiveOrderedColumnNames = fields.stream().filter(field -> 
!field.name().equalsIgnoreCase(PARTITION_COLUMN))
-        .map(Schema.Field::name).collect(Collectors.joining(","));
+    String hiveOrderedColumnNames = fields.stream().map(Schema.Field::name)
+        .filter(name -> !name.equalsIgnoreCase(PARTITION_COLUMN))
+        .map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.joining(","));
     if (isPartitioned) {
       hiveOrderedColumnNames += "," + PARTITION_COLUMN;
       jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, 
PARTITION_COLUMN);
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
index bbca26711279..d32da6179661 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
@@ -191,4 +191,107 @@ public class ArrayWritableTestUtil {
         assertEquals(expected, actual);
     }
   }
+
+  public static void assertArrayWritableMatchesSchema(Schema schema, Writable 
writable) {
+    switch (schema.getType()) {
+      case RECORD: {
+        assertInstanceOf(ArrayWritable.class, writable);
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        assertEquals(schema.getFields().size(), arrayWritable.get().length);
+        for (Schema.Field field : schema.getFields()) {
+          assertArrayWritableMatchesSchema(field.schema(), 
arrayWritable.get()[field.pos()]);
+        }
+        break;
+      }
+      case ARRAY: {
+        assertInstanceOf(ArrayWritable.class, writable);
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        for (int i = 0; i < arrayWritable.get().length; i++) {
+          assertArrayWritableMatchesSchema(schema.getElementType(), 
arrayWritable.get()[i]);
+        }
+        break;
+      }
+      case MAP: {
+        assertInstanceOf(ArrayWritable.class, writable);
+        ArrayWritable arrayWritable = (ArrayWritable) writable;
+        for (int i = 0; i < arrayWritable.get().length; i++) {
+          Writable expectedKV = arrayWritable.get()[i];
+          assertInstanceOf(ArrayWritable.class, expectedKV);
+          ArrayWritable kv = (ArrayWritable) expectedKV;
+          assertEquals(2, kv.get().length);
+          assertNotNull(kv.get()[0]);
+          assertArrayWritableMatchesSchema(schema.getValueType(), kv.get()[1]);
+        }
+        break;
+      }
+      case UNION:
+        if (schema.getTypes().size() == 2
+            && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+          assertArrayWritableMatchesSchema(schema.getTypes().get(1), writable);
+        } else if (schema.getTypes().size() == 2
+            && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+          assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
+        } else if (schema.getTypes().size() == 1) {
+          assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
+        } else {
+          throw new IllegalStateException("Union has more than 2 types or one 
type is not null: " + schema);
+        }
+        break;
+
+      default:
+        assertWritablePrimaryTypeMatchesSchema(schema, writable);
+    }
+  }
+
+  private static void assertWritablePrimaryTypeMatchesSchema(Schema schema, 
Writable writable) {
+    switch (schema.getType()) {
+      case NULL:
+        assertInstanceOf(NullWritable.class, writable);
+        break;
+
+      case BOOLEAN:
+        assertInstanceOf(BooleanWritable.class, writable);
+        break;
+
+      case INT:
+        if (schema.getLogicalType() instanceof LogicalTypes.Date) {
+          assertInstanceOf(DateWritable.class, writable);
+        } else {
+          assertInstanceOf(IntWritable.class, writable);
+        }
+        break;
+
+      case LONG:
+        assertInstanceOf(LongWritable.class, writable);
+        break;
+
+      case FLOAT:
+        assertInstanceOf(FloatWritable.class, writable);
+        break;
+
+      case DOUBLE:
+        assertInstanceOf(DoubleWritable.class, writable);
+        break;
+
+      case BYTES:
+      case ENUM:
+        assertInstanceOf(BytesWritable.class, writable);
+        break;
+
+      case STRING:
+        assertInstanceOf(Text.class, writable);
+        break;
+
+      case FIXED:
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          assertInstanceOf(HiveDecimalWritable.class, writable);
+        } else {
+          throw new IllegalStateException("Unexpected schema type: " + schema);
+        }
+        break;
+
+      default:
+        throw new IllegalStateException("Unexpected schema type: " + schema);
+    }
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index af0889c289b7..4d2679e9dc29 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.table.read;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -56,6 +57,8 @@ import org.apache.hudi.storage.StorageConfiguration;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -90,6 +93,7 @@ import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -116,11 +120,25 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
 
   public abstract String getCustomPayload();
 
-  public abstract void commitToTable(List<HoodieRecord> recordList, String 
operation,
-                                     Map<String, String> writeConfigs);
+  public abstract void commitToTable(List<HoodieRecord> recordList,
+                                     String operation,
+                                     boolean firstCommit,
+                                     Map<String, String> writeConfigs,
+                                     String schemaStr);
+
+  public void commitToTable(List<HoodieRecord> recordList,
+                            String operation,
+                            boolean firstCommit,
+                            Map<String, String> writeConfigs) {
+    commitToTable(recordList, operation, firstCommit, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
+  }
 
   public abstract void assertRecordsEqual(Schema schema, T expected, T actual);
 
+  public abstract void assertRecordMatchesSchema(Schema schema, T record);
+
+  public abstract HoodieTestDataGenerator.SchemaEvolutionConfigs 
getSchemaEvolutionConfigs();
+
   private static Stream<Arguments> testArguments() {
     return Stream.of(
         arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro", false),
@@ -141,7 +159,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a base file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, INSERT.value(), writeConfigs);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), true, 0, recordMergeMode,
           initialRecords, initialRecords);
@@ -150,7 +168,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
       List<HoodieRecord> unmergedRecords = 
CollectionUtils.combine(initialRecords, updates);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, UPSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), true, 1, recordMergeMode,
           allRecords, unmergedRecords);
@@ -158,7 +176,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       // Three commits; reading one file group containing a base file and two 
log files
       List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates("003", 100);
       List<HoodieRecord> finalRecords = mergeRecordLists(updates2, allRecords);
-      commitToTable(updates2, UPSERT.value(), writeConfigs);
+      commitToTable(updates2, UPSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), true, 2, recordMergeMode,
           finalRecords, CollectionUtils.combine(unmergedRecords, updates2));
@@ -183,7 +201,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, INSERT.value(), writeConfigs);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 1, recordMergeMode,
           initialRecords, initialRecords);
@@ -191,13 +209,251 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       // Two commits; reading one file group containing two log files
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, INSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 2, recordMergeMode,
           allRecords, CollectionUtils.combine(initialRecords, updates));
     }
   }
 
+  private static List<Pair<String, IndexedRecord>> 
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+    return hoodieRecords.stream().map(r -> {
+      try {
+        return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).filter(Option::isPresent).map(Option::get).map(r -> 
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+  }
+
+  /**
+   * Write a base file with schema A, then write another base file with schema 
B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws 
Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> secondRecords = 
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+    }
+  }
+
+  /**
+   * Write a base file with schema A, then write a log file with schema A, 
then write another base file with schema B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFileHasDifferentSchemaThanLogFiles() 
throws Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 10, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords);
+
+      // Write a log file with schema A
+      List<HoodieRecord> secondRecords = dataGen.generateUniqueUpdates("002", 
5);
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, UPSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> thirdRecords = 
dataGen.generateInsertsForPartition("003", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> thirdIndexedRecords = 
hoodieRecordsToIndexedRecords(thirdRecords, dataGen.getExtendedSchema());
+      commitToTable(thirdRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      mergedRecords = CollectionUtils.combine(mergedRecords, 
thirdIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          // use -1 to prevent validation of numlogfiles because one fg has a 
log file but the other doesn't
+          true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+    }
+  }
+
+  /**
+   * Write a base file with schema A, then write a log file with schema A, 
then write another log file with schema B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenLogFilesWithDifferentSchema() throws 
Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator baseFileDataGen =
+             new HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      baseFileDataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write base file with schema A
+      List<HoodieRecord> firstRecords = baseFileDataGen.generateInserts("001", 
100);
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords);
+
+      // Write log file with schema A
+      List<HoodieRecord> secondRecords = 
baseFileDataGen.generateUniqueUpdates("002", 50);
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(secondRecords, UPSERT.value(), false, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+
+      // Evolve schema
+      baseFileDataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write log file with schema B
+      List<HoodieRecord> thirdRecords = 
baseFileDataGen.generateUniqueUpdates("003", 50);
+      List<Pair<String, IndexedRecord>> thirdIndexedRecords = 
hoodieRecordsToIndexedRecords(thirdRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(thirdRecords, UPSERT.value(), false, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      mergedRecords = mergeIndexedRecordLists(thirdIndexedRecords, 
mergedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 2, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+    }
+  }
+
+  /**
+   * Write a base file with schema A, then write a log file with schema A, 
then write another log file with schema B. Then write a different base file 
with schema C.
+   */
+  @Test
+  public void 
testSchemaEvolutionWhenLogFilesWithDifferentSchemaAndTableSchemaDiffers() 
throws Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator baseFileDataGen =
+             new HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      baseFileDataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write base file with schema A
+      List<HoodieRecord> firstRecords = baseFileDataGen.generateInserts("001", 
100);
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords);
+
+      // Write log file with schema A
+      List<HoodieRecord> secondRecords = 
baseFileDataGen.generateUniqueUpdates("002", 50);
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(secondRecords, UPSERT.value(), false, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+
+      // Evolve schema
+      HoodieTestDataGenerator.SchemaEvolutionConfigs schemaEvolutionConfigs = 
getSchemaEvolutionConfigs();
+      boolean addNewFieldSupport = schemaEvolutionConfigs.addNewFieldSupport;
+      schemaEvolutionConfigs.addNewFieldSupport = false;
+      baseFileDataGen.extendSchemaAfterEvolution(schemaEvolutionConfigs);
+
+      // Write log file with schema B
+      List<HoodieRecord> thirdRecords = 
baseFileDataGen.generateUniqueUpdates("003", 50);
+      List<Pair<String, IndexedRecord>> thirdIndexedRecords = 
hoodieRecordsToIndexedRecords(thirdRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(thirdRecords, UPSERT.value(), false, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      mergedRecords = mergeIndexedRecordLists(thirdIndexedRecords, 
mergedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 2, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+
+      // Evolve schema again
+      schemaEvolutionConfigs = getSchemaEvolutionConfigs();
+      schemaEvolutionConfigs.addNewFieldSupport = addNewFieldSupport;
+      baseFileDataGen.extendSchemaAfterEvolution(schemaEvolutionConfigs);
+
+      // Write another base file with schema C
+      List<HoodieRecord> fourthRecords = 
baseFileDataGen.generateInsertsForPartition("004", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> fourthIndexedRecords = 
hoodieRecordsToIndexedRecords(fourthRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(fourthRecords, INSERT.value(), false, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      mergedRecords = CollectionUtils.combine(mergedRecords, 
fourthIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          // use -1 to prevent validation of numlogfiles because one fg has 
log files but the other doesn't
+          true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+    }
+  }
+
+  /**
+   * Write a base file with schema A, then write a log file with schema B
+   */
+  @Test
+  public void 
testSchemaEvolutionWhenBaseFilesWithDifferentSchemaFromLogFiles() throws 
Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator baseFileDataGen =
+             new HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      baseFileDataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write base file with schema A
+      List<HoodieRecord> firstRecords = baseFileDataGen.generateInserts("001", 
100);
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords);
+
+      //Evolve schema
+      baseFileDataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write log file with schema B
+      List<HoodieRecord> secondRecords = 
baseFileDataGen.generateUniqueUpdates("002", 50);
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, 
baseFileDataGen.getExtendedSchema());
+      commitToTable(secondRecords, UPSERT.value(), false, writeConfigs, 
baseFileDataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords);
+    }
+  }
+
   @Test
   public void testReadFileGroupInBootstrapMergeOnReadTable() throws Exception {
     Path zipOutput = Paths.get(new URI(getBasePath()));
@@ -219,7 +475,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType 
diskMapType) throws Exception {
     Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING, true));
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
-      commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), 
writeConfigs);
+      commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), true, 
writeConfigs);
       String baseMapPath = Files.createTempDirectory(null).toString();
       HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
       Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
@@ -285,6 +541,52 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     return configMapping;
   }
 
+  private void 
validateOutputFromFileGroupReaderWithNativeRecords(StorageConfiguration<?> 
storageConf,
+                                                                    String 
tablePath,
+                                                                    boolean 
containsBaseFile,
+                                                                    int 
expectedLogFileNum,
+                                                                    
RecordMergeMode recordMergeMode,
+                                                                    
List<Pair<String, IndexedRecord>> expectedRecords) throws Exception {
+    Set<String> metaCols = new HashSet<>(HoodieRecord.HOODIE_META_COLUMNS);
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
+    TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+    Schema avroSchema = resolver.getTableAvroSchema();
+    Schema avroSchemaWithoutMeta = resolver.getTableAvroSchema(false);
+    // use reader context for conversion to engine specific objects
+    HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath, 
avroSchema, getStorageConf(), metaClient);
+    List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath, 
metaClient, containsBaseFile, expectedLogFileNum);
+    boolean sortOutput = !containsBaseFile;
+    List<T> actualRecordList =
+        readRecordsFromFileGroup(storageConf, tablePath, metaClient, 
fileSlices, avroSchema, recordMergeMode, false, sortOutput);
+    assertEquals(expectedRecords.size(), actualRecordList.size());
+    actualRecordList.forEach(r -> assertRecordMatchesSchema(avroSchema, r));
+    Set<GenericRecord> actualRecordSet = actualRecordList.stream().map(r ->  
readerContext.convertToAvroRecord(r, avroSchema))
+        .map(r -> HoodieAvroUtils.removeFields(r, metaCols))
+        .collect(Collectors.toSet());
+    Set<GenericRecord> expectedRecordSet = expectedRecords.stream()
+        .map(r -> (GenericRecord) r.getRight())
+        .map(r -> HoodieAvroUtils.rewriteRecordWithNewSchema(r, 
avroSchemaWithoutMeta))
+        .collect(Collectors.toSet());
+    compareRecordSets(expectedRecordSet, actualRecordSet);
+  }
+
+  private void compareRecordSets(Set<GenericRecord> expectedRecordSet, 
Set<GenericRecord> actualRecordSet) {
+    Map<String, GenericRecord> expectedMap = new 
HashMap<>(expectedRecordSet.size());
+    for (GenericRecord expectedRecord : expectedRecordSet) {
+      expectedMap.put(expectedRecord.get("_row_key").toString(), 
expectedRecord);
+    }
+    Map<String, GenericRecord> actualMap = new 
HashMap<>(actualRecordSet.size());
+    for (GenericRecord actualRecord : actualRecordSet) {
+      actualMap.put(actualRecord.get("_row_key").toString(), actualRecord);
+    }
+    assertEquals(expectedMap.keySet(), actualMap.keySet());
+    for (String key : actualMap.keySet()) {
+      GenericRecord expectedRecord = expectedMap.get(key);
+      GenericRecord actualRecord = actualMap.get(key);
+      assertEquals(expectedRecord, actualRecord);
+    }
+  }
+
   protected void validateOutputFromFileGroupReader(StorageConfiguration<?> 
storageConf,
                                                  String tablePath,
                                                  boolean containsBaseFile,
@@ -364,7 +666,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         baseFile.setBootstrapBaseFile(new BaseFile(newBootstrapPath));
       }
       List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
-      assertEquals(expectedLogFileNum, logFilePathList.size());
+      if (expectedLogFileNum >= 0) {
+        assertEquals(expectedLogFileNum, logFilePathList.size());
+      }
       assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
     });
     return fileSlices;
@@ -397,7 +701,11 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     return actualRecordList;
   }
 
-  private HoodieFileGroupReader<T> 
getHoodieFileGroupReader(StorageConfiguration<?> storageConf, String tablePath, 
HoodieTableMetaClient metaClient, Schema avroSchema, FileSlice fileSlice,
+  private HoodieFileGroupReader<T> 
getHoodieFileGroupReader(StorageConfiguration<?> storageConf,
+                                                            String tablePath,
+                                                            
HoodieTableMetaClient metaClient,
+                                                            Schema avroSchema,
+                                                            FileSlice 
fileSlice,
                                                             int start, 
TypedProperties props, boolean sortOutput) {
     return HoodieFileGroupReader.<T>newBuilder()
         .withReaderContext(getHoodieReaderContext(tablePath, avroSchema, 
storageConf, metaClient))
@@ -488,6 +796,12 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     return false;
   }
 
+  protected List<Pair<String, IndexedRecord>> 
mergeIndexedRecordLists(List<Pair<String, IndexedRecord>> updates, 
List<Pair<String, IndexedRecord>> existing) {
+    Set<String> updatedKeys = 
updates.stream().map(Pair::getLeft).collect(Collectors.toSet());
+    return Stream.concat(updates.stream(), existing.stream().filter(record -> 
!updatedKeys.contains(record.getLeft())))
+        .collect(Collectors.toList());
+  }
+
   protected List<HoodieRecord> mergeRecordLists(List<HoodieRecord> updates, 
List<HoodieRecord> existing) {
     Set<String> updatedKeys = 
updates.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
     return Stream.concat(updates.stream(), existing.stream().filter(record -> 
!updatedKeys.contains(record.getRecordKey())))
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 3549a0ba3e83..97810a21d9f0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.testutils;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -78,6 +79,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -184,6 +186,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public static final Schema AVRO_TRIP_ENCODED_DECIMAL_SCHEMA = new 
Schema.Parser().parse(TRIP_ENCODED_DECIMAL_SCHEMA);
   public static final Schema AVRO_TRIP_SCHEMA = new 
Schema.Parser().parse(TRIP_SCHEMA);
   public static final Schema FLATTENED_AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
+
   private final Random rand;
 
   //Maintains all the existing keys schema wise
@@ -191,6 +194,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   private final String[] partitionPaths;
   //maintains the count of existing keys schema wise
   private Map<String, Integer> numKeysBySchema;
+  private Option<Schema> extendedSchema = Option.empty();
 
   public HoodieTestDataGenerator(long seed) {
     this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
@@ -356,7 +360,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     GenericRecord rec = generateGenericRecord(
         key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, 
"driver-" + instantTime, timestamp,
         false, isFlattened);
-    return new RawTripTestPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
+    return new RawTripTestPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), 
extendedSchema.map(Schema::toString).orElse(TRIP_EXAMPLE_SCHEMA));
   }
 
   private RawTripTestPayload generateNestedExampleRandomValue(
@@ -471,9 +475,12 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
    * Populate rec with values for FARE_NESTED_SCHEMA
    */
   private void generateFareNestedValues(GenericRecord rec) {
-    GenericRecord fareRecord = new 
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
+    GenericRecord fareRecord = new 
GenericData.Record(extendedSchema.orElse(AVRO_SCHEMA).getField("fare").schema());
     fareRecord.put("amount", rand.nextDouble() * 100);
     fareRecord.put("currency", "USD");
+    if (extendedSchema.isPresent()) {
+      generateCustomValues(fareRecord, "customFare");
+    }
     rec.put("fare", fareRecord);
   }
 
@@ -481,6 +488,12 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
    * Populate rec with values for TIP_NESTED_SCHEMA
    */
   private void generateTipNestedValues(GenericRecord rec) {
+    // TODO [HUDI-9603] remove this check
+    if (extendedSchema.isPresent()) {
+      if (extendedSchema.get().getField("tip_history") == null) {
+        return;
+      }
+    }
     GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, 
AVRO_SCHEMA.getField("tip_history").schema());
     Schema tipSchema = new 
Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
     GenericRecord tipRecord = new GenericData.Record(tipSchema);
@@ -507,7 +520,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public GenericRecord generateGenericRecord(String rowKey, String 
partitionPath, String riderName, String driverName,
                                                     long timestamp, boolean 
isDeleteRecord,
                                                     boolean isFlattened) {
-    GenericRecord rec = new GenericData.Record(isFlattened ? 
FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
+    GenericRecord rec = new GenericData.Record(extendedSchema.orElseGet(() -> 
isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA));
     generateTripPrefixValues(rec, rowKey, partitionPath, riderName, 
driverName, timestamp);
     if (isFlattened) {
       generateFareFlattenedValues(rec);
@@ -517,6 +530,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
       generateFareNestedValues(rec);
       generateTipNestedValues(rec);
     }
+    generateCustomValues(rec, "customField");
     generateTripSuffixValues(rec, isDeleteRecord);
     return rec;
   }
@@ -524,7 +538,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   /**
    * Generate record conforming to TRIP_NESTED_EXAMPLE_SCHEMA
    */
-  public  GenericRecord generateNestedExampleGenericRecord(String rowKey, 
String partitionPath, String riderName, String driverName,
+  public GenericRecord generateNestedExampleGenericRecord(String rowKey, 
String partitionPath, String riderName, String driverName,
                                                         long timestamp, 
boolean isDeleteRecord) {
     GenericRecord rec = new GenericData.Record(NESTED_AVRO_SCHEMA);
     generateTripPrefixValues(rec, rowKey, partitionPath, riderName, 
driverName, timestamp);
@@ -1276,5 +1290,221 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
     public String getRiderValue() {
       return riderValue;
     }
+
+    @Override
+    public String toString() {
+      return "RowKey: " + recordKey + ", PartitionPath: " + partitionPath
+          + ", OrderingVal: " + orderingVal + ", RiderValue: " + riderValue;
+    }
+  }
+
+  public static class SchemaEvolutionConfigs {
+    public Schema schema = AVRO_SCHEMA;
+    public boolean nestedSupport = true;
+    public boolean mapSupport = true;
+    public boolean arraySupport = true;
+    public boolean addNewFieldSupport = true;
+    // TODO: [HUDI-9603] Flink 1.18 array values incorrect in fg reader test
+    public boolean anyArraySupport = true;
+
+    // Int
+    public boolean intToLongSupport = true;
+    public boolean intToFloatSupport = true;
+    public boolean intToDoubleSupport = true;
+    public boolean intToStringSupport = true;
+
+    // Long
+    public boolean longToFloatSupport = true;
+    public boolean longToDoubleSupport = true;
+    public boolean longToStringSupport = true;
+
+    // Float
+    public boolean floatToDoubleSupport = true;
+    public boolean floatToStringSupport = true;
+
+    // Double
+    public boolean doubleToStringSupport = true;
+
+    // String
+    public boolean stringToBytesSupport = true;
+
+    // Bytes
+    public boolean bytesToStringSupport = true;
+  }
+
+  private enum SchemaEvolutionTypePromotionCase {
+    INT_TO_INT(Schema.Type.INT, Schema.Type.INT, config -> true),
+    INT_TO_LONG(Schema.Type.INT, Schema.Type.LONG, config -> 
config.intToLongSupport),
+    INT_TO_FLOAT(Schema.Type.INT, Schema.Type.FLOAT, config -> 
config.intToFloatSupport),
+    INT_TO_DOUBLE(Schema.Type.INT, Schema.Type.DOUBLE, config -> 
config.intToDoubleSupport),
+    INT_TO_STRING(Schema.Type.INT, Schema.Type.STRING, config -> 
config.intToStringSupport),
+    LONG_TO_LONG(Schema.Type.LONG, Schema.Type.LONG, config -> true),
+    LONG_TO_FLOAT(Schema.Type.LONG, Schema.Type.FLOAT, config -> 
config.longToFloatSupport),
+    LONG_TO_DOUBLE(Schema.Type.LONG, Schema.Type.DOUBLE, config -> 
config.longToDoubleSupport),
+    LONG_TO_STRING(Schema.Type.LONG, Schema.Type.STRING, config -> 
config.longToStringSupport),
+    FLOAT_TO_FLOAT(Schema.Type.FLOAT, Schema.Type.FLOAT, config -> true),
+    FLOAT_TO_DOUBLE(Schema.Type.FLOAT, Schema.Type.DOUBLE, config -> 
config.floatToDoubleSupport),
+    FLOAT_TO_STRING(Schema.Type.FLOAT, Schema.Type.STRING, config -> 
config.floatToStringSupport),
+    DOUBLE_TO_DOUBLE(Schema.Type.DOUBLE, Schema.Type.DOUBLE, config -> true),
+    DOUBLE_TO_STRING(Schema.Type.DOUBLE, Schema.Type.STRING, config -> 
config.doubleToStringSupport),
+    STRING_TO_STRING(Schema.Type.STRING, Schema.Type.STRING, config -> true),
+    STRING_TO_BYTES(Schema.Type.STRING, Schema.Type.BYTES, config -> 
config.stringToBytesSupport),
+    BYTES_TO_BYTES(Schema.Type.BYTES, Schema.Type.BYTES, config -> true),
+    BYTES_TO_STRING(Schema.Type.BYTES, Schema.Type.STRING, config -> 
config.bytesToStringSupport);
+
+    public final Schema.Type before;
+    public final Schema.Type after;
+    public final Predicate<SchemaEvolutionConfigs> isEnabled;
+
+    SchemaEvolutionTypePromotionCase(Schema.Type before, Schema.Type after, 
Predicate<SchemaEvolutionConfigs> isEnabled) {
+      this.before = before;
+      this.after = after;
+      this.isEnabled = isEnabled;
+    }
+  }
+
+  public void extendSchema(SchemaEvolutionConfigs configs, boolean isBefore) {
+    List<Schema.Type> baseFields = new ArrayList<>();
+    for (SchemaEvolutionTypePromotionCase evolution : 
SchemaEvolutionTypePromotionCase.values()) {
+      if (evolution.isEnabled.test(configs)) {
+        baseFields.add(isBefore ? evolution.before : evolution.after);
+      }
+    }
+
+    // Add new field if we are testing adding new fields
+    if (!isBefore && configs.addNewFieldSupport) {
+      baseFields.add(Schema.Type.BOOLEAN);
+    }
+
+    this.extendedSchema = Option.of(generateExtendedSchema(configs, new 
ArrayList<>(baseFields)));
+  }
+
+  public void extendSchemaBeforeEvolution(SchemaEvolutionConfigs configs) {
+    extendSchema(configs, true);
+  }
+
+  public void extendSchemaAfterEvolution(SchemaEvolutionConfigs configs) {
+    extendSchema(configs, false);
+  }
+
+  public Schema getExtendedSchema() {
+    return extendedSchema.orElseThrow(IllegalArgumentException::new);
+  }
+
+  private static Schema generateExtendedSchema(SchemaEvolutionConfigs configs, 
List<Schema.Type> baseFields) {
+    return generateExtendedSchema(configs.schema, configs, baseFields, 
"customField", true);
+  }
+
+  private static Schema generateExtendedSchema(Schema baseSchema, 
SchemaEvolutionConfigs configs, List<Schema.Type> baseFields, String 
fieldPrefix, boolean toplevel) {
+    List<Schema.Field> fields =  baseSchema.getFields();
+    List<Schema.Field> finalFields = new ArrayList<>(fields.size() + 
baseFields.size());
+    boolean addedFields = false;
+    for (Schema.Field field : fields) {
+      if (configs.nestedSupport && field.name().equals("fare") && 
field.schema().getType() == Schema.Type.RECORD) {
+        finalFields.add(new Schema.Field(field.name(), 
generateExtendedSchema(field.schema(), configs, baseFields, "customFare", 
false), field.doc(), field.defaultVal()));
+      } else if (configs.anyArraySupport || 
!field.name().equals("tip_history")) {
+        //TODO: [HUDI-9603] remove the if condition when the issue is fixed
+        if (field.name().equals("_hoodie_is_deleted")) {
+          addedFields = true;
+          addFields(configs, finalFields, baseFields, fieldPrefix, 
baseSchema.getNamespace(), toplevel);
+        }
+        finalFields.add(new Schema.Field(field.name(), field.schema(), 
field.doc(), field.defaultVal()));
+      }
+    }
+    if (!addedFields) {
+      addFields(configs, finalFields, baseFields, fieldPrefix, 
baseSchema.getNamespace(), toplevel);
+    }
+    Schema finalSchema = Schema.createRecord(baseSchema.getName(), 
baseSchema.getDoc(),
+        baseSchema.getNamespace(), baseSchema.isError());
+    finalSchema.setFields(finalFields);
+    return finalSchema;
+  }
+
+  private static void addFields(SchemaEvolutionConfigs configs, 
List<Schema.Field> finalFields, List<Schema.Type> baseFields, String 
fieldPrefix, String namespace, boolean toplevel) {
+    if (toplevel) {
+      if (configs.mapSupport) {
+        List<Schema.Field> mapFields = new ArrayList<>(baseFields.size());
+        addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map");
+        finalFields.add(new Schema.Field(fieldPrefix + "Map", 
Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false, 
mapFields)), "", null));
+      }
+
+      if (configs.arraySupport) {
+        List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size());
+        addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array");
+        finalFields.add(new Schema.Field(fieldPrefix + "Array", 
Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace, 
false, arrayFields)), "", null));
+      }
+    }
+    addFieldsHelper(finalFields, baseFields, fieldPrefix);
+  }
+
+  private static void addFieldsHelper(List<Schema.Field> finalFields, 
List<Schema.Type> baseFields, String fieldPrefix) {
+    for (int i = 0; i < baseFields.size(); i++) {
+      if (baseFields.get(i) == Schema.Type.BOOLEAN) {
+        // boolean fields are added fields
+        finalFields.add(new Schema.Field(fieldPrefix + i, 
AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null));
+      } else {
+        finalFields.add(new Schema.Field(fieldPrefix + i, 
Schema.create(baseFields.get(i)), "", null));
+      }
+    }
+  }
+
+  private void generateCustomValues(GenericRecord rec, String customPrefix) {
+    for (Schema.Field field : rec.getSchema().getFields()) {
+      if (field.name().startsWith(customPrefix)) {
+        switch (field.schema().getType()) {
+          case INT:
+            rec.put(field.name(), rand.nextInt());
+            break;
+          case LONG:
+            rec.put(field.name(), rand.nextLong());
+            break;
+          case FLOAT:
+            rec.put(field.name(), rand.nextFloat());
+            break;
+          case DOUBLE:
+            rec.put(field.name(), rand.nextDouble());
+            break;
+          case STRING:
+            rec.put(field.name(), genPseudoRandomUUID(rand).toString());
+            break;
+          case BYTES:
+            rec.put(field.name(), 
ByteBuffer.wrap(getUTF8Bytes(genPseudoRandomUUID(rand).toString())));
+            break;
+          case UNION:
+            if 
(!AvroSchemaUtils.resolveNullableSchema(field.schema()).getType().equals(Schema.Type.BOOLEAN))
 {
+              throw new IllegalStateException("Union should only be boolean");
+            }
+            rec.put(field.name(), rand.nextBoolean());
+            break;
+          case BOOLEAN:
+            rec.put(field.name(), rand.nextBoolean());
+            break;
+          case MAP:
+            rec.put(field.name(), genMap(field.schema(), field.name()));
+            break;
+          case ARRAY:
+            rec.put(field.name(), genArray(field.schema(), field.name()));
+            break;
+          default:
+            throw new UnsupportedOperationException("Unsupported type: " + 
field.schema().getType());
+        }
+      }
+    }
+  }
+
+  private GenericArray<GenericRecord> genArray(Schema arraySchema, String 
customPrefix) {
+    GenericArray<GenericRecord> customArray = new GenericData.Array<>(1, 
arraySchema);
+    Schema arrayElementSchema = arraySchema.getElementType();
+    GenericRecord customRecord = new GenericData.Record(arrayElementSchema);
+    generateCustomValues(customRecord, customPrefix);
+    customArray.add(customRecord);
+    return customArray;
+  }
+
+  private Map<String,GenericRecord> genMap(Schema mapSchema, String 
customPrefix) {
+    Schema mapElementSchema = mapSchema.getValueType();
+    GenericRecord customRecord = new GenericData.Record(mapElementSchema);
+    generateCustomValues(customRecord, customPrefix);
+    return Collections.singletonMap("customMapKey", customRecord);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 6f6b19f2444b..5ac306937996 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -81,15 +81,11 @@ import static org.mockito.Mockito.when;
 public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderBase<RowData> {
   private Configuration conf;
   private Option<InstantRange> instantRangeOpt = Option.empty();
-  private static final Schema RECORD_SCHEMA = getRecordAvroSchema();
-  private static final AvroToRowDataConverters.AvroToRowDataConverter 
AVRO_CONVERTER =
-      
RowDataAvroQueryContexts.fromAvroSchema(RECORD_SCHEMA).getAvroToRowDataConverter();
 
   @BeforeEach
   public void setup() {
     conf = new Configuration();
     conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
-    conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, RECORD_SCHEMA.toString());
     conf.set(FlinkOptions.PATH, getBasePath());
     conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
     // use hive style partition as a workaround for HUDI-9396
@@ -142,12 +138,16 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
   }
 
   @Override
-  public void commitToTable(List<HoodieRecord> recordList, String operation, 
Map<String, String> writeConfigs) {
+  public void commitToTable(List<HoodieRecord> recordList, String operation, 
boolean firstCommit, Map<String, String> writeConfigs, String schemaStr) {
     writeConfigs.forEach((key, value) -> conf.setString(key, value));
     conf.set(FlinkOptions.OPERATION, operation);
+    Schema localSchema = getRecordAvroSchema(schemaStr);
+    conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, localSchema.toString());
+    AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
+        
RowDataAvroQueryContexts.fromAvroSchema(localSchema).getAvroToRowDataConverter();
     List<RowData> rowDataList = recordList.stream().map(record -> {
       try {
-        return (RowData) 
AVRO_CONVERTER.convert(record.toIndexedRecord(RECORD_SCHEMA, 
CollectionUtils.emptyProps()).get().getData());
+        return (RowData) 
avroConverter.convert(record.toIndexedRecord(localSchema, 
CollectionUtils.emptyProps()).get().getData());
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -168,6 +168,34 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
         RowDataAvroQueryContexts.fromAvroSchema(schema).getRowType());
   }
 
+  @Override
+  public void assertRecordMatchesSchema(Schema schema, RowData record) {
+    // TODO: Add support for RowData
+  }
+
+  @Override
+  public HoodieTestDataGenerator.SchemaEvolutionConfigs 
getSchemaEvolutionConfigs() {
+    HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new 
HoodieTestDataGenerator.SchemaEvolutionConfigs();
+    configs.nestedSupport = false;
+    configs.arraySupport = false;
+    configs.mapSupport = false;
+    configs.anyArraySupport = false;
+    configs.addNewFieldSupport = false;
+    configs.intToLongSupport = false;
+    configs.intToFloatSupport = false;
+    configs.intToDoubleSupport = false;
+    configs.intToStringSupport = false;
+    configs.longToFloatSupport = false;
+    configs.longToDoubleSupport = false;
+    configs.longToStringSupport = false;
+    configs.floatToDoubleSupport = false;
+    configs.floatToStringSupport = false;
+    configs.doubleToStringSupport = false;
+    configs.stringToBytesSupport = false;
+    configs.bytesToStringSupport = false;
+    return configs;
+  }
+
   @Test
   public void testGetOrderingValue() {
     HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
@@ -250,7 +278,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, UPSERT.value(), writeConfigs);
+      commitToTable(initialRecords, UPSERT.value(), true, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 1, recordMergeMode,
           initialRecords, initialRecords);
@@ -258,7 +286,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
       // Two commits; reading one file group containing two log files
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, UPSERT.value(), false, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 2, recordMergeMode,
           allRecords, CollectionUtils.combine(initialRecords, updates));
@@ -275,7 +303,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, firstCommitOperation.value(), 
writeConfigs);
+      commitToTable(initialRecords, firstCommitOperation.value(), true, 
writeConfigs, TRIP_EXAMPLE_SCHEMA);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), isFirstCommitInsert,
           isFirstCommitInsert ? 0 : 1, recordMergeMode, initialRecords, 
initialRecords);
@@ -286,7 +314,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
 
       // the base/log file of the first commit is filtered out by the instant 
range.
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, UPSERT.value(), false, writeConfigs, 
TRIP_EXAMPLE_SCHEMA);
       validateOutputFromFileGroupReader(getStorageConf(), getBasePath(),
           isFirstCommitInsert, isFirstCommitInsert ? 1 : 2, recordMergeMode, 
updates, updates);
       // reset instant range
@@ -294,8 +322,8 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     }
   }
 
-  private static Schema getRecordAvroSchema() {
-    Schema recordSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+  private static Schema getRecordAvroSchema(String schemaStr) {
+    Schema recordSchema = new Schema.Parser().parse(schemaStr);
     return 
AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
   }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java
new file mode 100644
index 000000000000..5022f0cef926
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.BYTES;
+import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FIXED;
+import static org.apache.avro.Schema.Type.FLOAT;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.avro.Schema.Type.STRING;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ * Taken from 
https://github.com/apache/hive/blob/rel/release-2.3.4/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
+ */
+public class HiveTypeUtils {
+  // Conversion of Avro primitive types to Hive primitive types
+  // Avro             Hive
+  // Null
+  // boolean          boolean    check
+  // int              int        check
+  // long             bigint     check
+  // float            double     check
+  // double           double     check
+  // bytes            binary     check
+  // fixed            binary     check
+  // string           string     check
+  //                  tinyint
+  //                  smallint
+
+  // Map of Avro's primitive types to Hives (for those that are supported by 
both)
+  private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO 
= initTypeMap();
+  private static Map<Schema.Type, TypeInfo> initTypeMap() {
+    Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
+    theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
+    theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
+    theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
+    theMap.put(LONG, TypeInfoFactory.getPrimitiveTypeInfo("bigint"));
+    theMap.put(FLOAT, TypeInfoFactory.getPrimitiveTypeInfo("float"));
+    theMap.put(DOUBLE, TypeInfoFactory.getPrimitiveTypeInfo("double"));
+    theMap.put(BYTES, TypeInfoFactory.getPrimitiveTypeInfo("binary"));
+    theMap.put(FIXED, TypeInfoFactory.getPrimitiveTypeInfo("binary"));
+    theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string"));
+    return Collections.unmodifiableMap(theMap);
+  }
+
+  /**
+   * Generate a list of of TypeInfos from an Avro schema.  This method is
+   * currently public due to some weirdness in deserializing unions, but
+   * will be made private once that is resolved.
+   * @param schema Schema to generate field types for
+   * @return List of TypeInfos, each element of which is a TypeInfo derived
+   *         from the schema.
+   * @throws AvroSerdeException for problems during conversion.
+   */
+  public static List<TypeInfo> generateColumnTypes(Schema schema) throws 
AvroSerdeException {
+    return generateColumnTypes(schema, null);
+  }
+
+  /**
+   * Generate a list of of TypeInfos from an Avro schema.  This method is
+   * currently public due to some weirdness in deserializing unions, but
+   * will be made private once that is resolved.
+   * @param schema Schema to generate field types for
+   * @param seenSchemas stores schemas processed in the parsing done so far,
+   *         helping to resolve circular references in the schema
+   * @return List of TypeInfos, each element of which is a TypeInfo derived
+   *         from the schema.
+   * @throws AvroSerdeException for problems during conversion.
+   */
+  public static List<TypeInfo> generateColumnTypes(Schema schema,
+                                                   Set<Schema> seenSchemas) 
throws AvroSerdeException {
+    List<Schema.Field> fields = schema.getFields();
+
+    List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
+
+    for (Schema.Field field : fields) {
+      types.add(generateTypeInfo(field.schema(), seenSchemas));
+    }
+
+    return types;
+  }
+
+  static InstanceCache<Schema, TypeInfo> typeInfoCache = new 
InstanceCache<Schema, TypeInfo>() {
+    @Override
+    protected TypeInfo makeInstance(Schema s,
+                                    Set<Schema> seenSchemas)
+        throws AvroSerdeException {
+      return generateTypeInfoWorker(s, seenSchemas);
+    }
+  };
+  /**
+   * Convert an Avro Schema into an equivalent Hive TypeInfo.
+   * @param schema to record. Must be of record type.
+   * @param seenSchemas stores schemas processed in the parsing done so far,
+   *         helping to resolve circular references in the schema
+   * @return TypeInfo matching the Avro schema
+   * @throws AvroSerdeException for any problems during conversion.
+   */
+  public static TypeInfo generateTypeInfo(Schema schema,
+                                          Set<Schema> seenSchemas) throws 
AvroSerdeException {
+    // For bytes type, it can be mapped to decimal.
+    Schema.Type type = schema.getType();
+    // HUDI MODIFICATION ADDED "|| type == FIXED"
+    if ((type == BYTES || type == FIXED) && AvroSerDe.DECIMAL_TYPE_NAME
+        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+      int precision = 0;
+      int scale = 0;
+      try {
+        precision = 
getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_PRECISION));
+        scale = getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_SCALE));
+      } catch (Exception ex) {
+        throw new AvroSerdeException("Failed to obtain scale value from file 
schema: " + schema, ex);
+      }
+
+      try {
+        HiveDecimalUtils.validateParameter(precision, scale);
+      } catch (Exception ex) {
+        throw new AvroSerdeException("Invalid precision or scale for decimal 
type", ex);
+      }
+
+      return TypeInfoFactory.getDecimalTypeInfo(precision, scale);
+    }
+
+    if (type == STRING
+        && 
AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+      int maxLength = 0;
+      try {
+        maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
+      } catch (Exception ex) {
+        throw new AvroSerdeException("Failed to obtain maxLength value from 
file schema: " + schema, ex);
+      }
+      return TypeInfoFactory.getCharTypeInfo(maxLength);
+    }
+
+    if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME
+        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+      int maxLength = 0;
+      try {
+        maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
+      } catch (Exception ex) {
+        throw new AvroSerdeException("Failed to obtain maxLength value from 
file schema: " + schema, ex);
+      }
+      return TypeInfoFactory.getVarcharTypeInfo(maxLength);
+    }
+
+    if (type == INT
+        && 
AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+      return TypeInfoFactory.dateTypeInfo;
+    }
+
+    if (type == LONG
+        && 
AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
 {
+      return TypeInfoFactory.timestampTypeInfo;
+    }
+
+    return typeInfoCache.retrieve(schema, seenSchemas);
+  }
+
+  private static boolean isEmpty(final CharSequence cs) {
+    return cs == null || cs.length() == 0;
+  }
+
+  // added this from StringUtils
+  private static boolean isNumeric(final CharSequence cs) {
+    if (isEmpty(cs)) {
+      return false;
+    }
+    final int sz = cs.length();
+    for (int i = 0; i < sz; i++) {
+      if (!Character.isDigit(cs.charAt(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // added this from hive latest
+  private static int getIntValue(Object obj) {
+    int value = 0;
+    if (obj instanceof Integer) {
+      value = (int) obj;
+    } else if (obj instanceof String && isNumeric((String)obj)) {
+      value = Integer.parseInt((String)obj);
+    }
+    return value;
+  }
+
+  // added this from AvroSerdeUtils in hive latest
+  public static int getIntFromSchema(Schema schema, String name) {
+    Object obj = schema.getObjectProp(name);
+    if (obj instanceof String) {
+      return Integer.parseInt((String) obj);
+    } else if (obj instanceof Integer) {
+      return (int) obj;
+    } else {
+      throw new IllegalArgumentException("Expect integer or string value from 
property " + name
+          + " but found type " + obj.getClass().getName());
+    }
+  }
+
+  private static TypeInfo generateTypeInfoWorker(Schema schema,
+                                                 Set<Schema> seenSchemas) 
throws AvroSerdeException {
+    // Avro requires NULLable types to be defined as unions of some type T
+    // and NULL.  This is annoying and we're going to hide it from the user.
+    if (AvroSerdeUtils.isNullableType(schema)) {
+      return generateTypeInfo(
+          AvroSerdeUtils.getOtherTypeFromNullableType(schema), seenSchemas);
+    }
+
+    Schema.Type type = schema.getType();
+    if (PRIMITIVE_TYPE_TO_TYPE_INFO.containsKey(type)) {
+      return PRIMITIVE_TYPE_TO_TYPE_INFO.get(type);
+    }
+
+    switch (type) {
+      case RECORD: return generateRecordTypeInfo(schema, seenSchemas);
+      case MAP:    return generateMapTypeInfo(schema, seenSchemas);
+      case ARRAY:  return generateArrayTypeInfo(schema, seenSchemas);
+      case UNION:  return generateUnionTypeInfo(schema, seenSchemas);
+      case ENUM:   return generateEnumTypeInfo(schema);
+      default:     throw new AvroSerdeException("Do not yet support: " + 
schema);
+    }
+  }
+
+  private static TypeInfo generateRecordTypeInfo(Schema schema,
+                                                 Set<Schema> seenSchemas) 
throws AvroSerdeException {
+    assert schema.getType().equals(Schema.Type.RECORD);
+
+    if (seenSchemas == null) {
+      seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema, 
Boolean>());
+    } else if (seenSchemas.contains(schema)) {
+      throw new AvroSerdeException(
+          "Recursive schemas are not supported. Recursive schema was " + schema
+              .getFullName());
+    }
+    seenSchemas.add(schema);
+
+    List<Schema.Field> fields = schema.getFields();
+    List<String> fieldNames = new ArrayList<String>(fields.size());
+    List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
+
+    for (int i = 0; i < fields.size(); i++) {
+      fieldNames.add(i, fields.get(i).name());
+      typeInfos.add(i, generateTypeInfo(fields.get(i).schema(), seenSchemas));
+    }
+
+    return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos);
+  }
+
+  /**
+   * Generate a TypeInfo for an Avro Map.  This is made slightly simpler in 
that
+   * Avro only allows maps with strings for keys.
+   */
+  private static TypeInfo generateMapTypeInfo(Schema schema,
+                                              Set<Schema> seenSchemas) throws 
AvroSerdeException {
+    assert schema.getType().equals(Schema.Type.MAP);
+    Schema valueType = schema.getValueType();
+    TypeInfo ti = generateTypeInfo(valueType, seenSchemas);
+
+    return 
TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), 
ti);
+  }
+
+  private static TypeInfo generateArrayTypeInfo(Schema schema,
+                                                Set<Schema> seenSchemas) 
throws AvroSerdeException {
+    assert schema.getType().equals(Schema.Type.ARRAY);
+    Schema itemsType = schema.getElementType();
+    TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas);
+
+    return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
+  }
+
+  private static TypeInfo generateUnionTypeInfo(Schema schema,
+                                                Set<Schema> seenSchemas) 
throws AvroSerdeException {
+    assert schema.getType().equals(Schema.Type.UNION);
+    List<Schema> types = schema.getTypes();
+
+
+    List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
+
+    for (Schema type : types) {
+      typeInfos.add(generateTypeInfo(type, seenSchemas));
+    }
+
+    return TypeInfoFactory.getUnionTypeInfo(typeInfos);
+  }
+
+  // Hive doesn't have an Enum type, so we're going to treat them as Strings.
+  // During the deserialize/serialize stage we'll check for enumness and
+  // convert as such.
+  private static TypeInfo generateEnumTypeInfo(Schema schema) {
+    assert schema.getType().equals(Schema.Type.ENUM);
+
+    return TypeInfoFactory.getPrimitiveTypeInfo("string");
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
index fd637490db8f..f743ad81e51f 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -101,7 +101,7 @@ public class ObjectInspectorCache {
 
   public ArrayWritableObjectInspector getObjectInspector(Schema schema) {
     return objectInspectorCache.computeIfAbsent(schema, s -> {
-      List<String> columnNameList = 
s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+      List<String> columnNameList = 
s.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
       List<TypeInfo> columnTypeList = 
columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList());
       StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
       return new ArrayWritableObjectInspector(rowTypeInfo);
@@ -110,7 +110,7 @@ public class ObjectInspectorCache {
 
   public GenericRecord serialize(ArrayWritable record, Schema schema) {
     return serializerCache.computeIfAbsent(schema, s -> {
-      List<String> columnNameList = 
s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+      List<String> columnNameList = 
s.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
       List<TypeInfo> columnTypeList = 
columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList());
       StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
       return new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 9158961b40c9..a351665bf9e5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,29 +19,31 @@
 
 package org.apache.hudi.common.table.read
 
-import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
 import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode, 
TypedProperties}
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, 
DELETE_MARKER}
-import org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import 
org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark.getFileCount
-import org.apache.hudi.common.testutils.{HoodieTestUtils, RawTripTestPayload}
-import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
+import org.apache.hudi.common.util.{CollectionUtils, Option => HOption}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
 import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 
 import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
-import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, Row, SaveMode, 
SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader
-import org.apache.spark.sql.types.StructType
+import 
org.apache.spark.sql.internal.SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
DataType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, 
StructType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -77,6 +79,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     sparkConf.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
     sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
     sparkConf.set("spark.sql.parquet.enableVectorizedReader", "false")
+    sparkConf.set(LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key, 
"true")
     HoodieSparkKryoRegistrar.register(sparkConf)
     spark = SparkSession.builder.config(sparkConf).getOrCreate
   }
@@ -101,17 +104,22 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty, 
getStorageConf, metaClient.getTableConfig)
   }
 
-  override def commitToTable(recordList: util.List[HoodieRecord[_]], 
operation: String, options: util.Map[String, String]): Unit = {
-    val recs = RawTripTestPayload.recordsToStrings(recordList)
-    val inputDF: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(recs.asScala.toList, 2))
+  override def commitToTable(recordList: util.List[HoodieRecord[_]],
+                             operation: String,
+                             firstCommit: Boolean,
+                             options: util.Map[String, String],
+                             schemaStr: String): Unit = {
+    val schema = new Schema.Parser().parse(schemaStr)
+    val genericRecords = 
spark.sparkContext.parallelize(recordList.asScala.map(_.toIndexedRecord(schema, 
CollectionUtils.emptyProps))
+      .filter(r => r.isPresent).map(r => 
r.get.getData.asInstanceOf[GenericRecord]).toSeq, 2)
+    val inputDF: Dataset[Row] = 
AvroConversionUtils.createDataFrame(genericRecords, schemaStr, spark);
 
     inputDF.write.format("hudi")
       .options(options)
       .option("hoodie.compact.inline", "false") // else fails due to 
compaction & deltacommit instant times being same
       .option("hoodie.datasource.write.operation", operation)
       .option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
-      .mode(if (operation.equalsIgnoreCase(WriteOperationType.INSERT.value())) 
SaveMode.Overwrite
-      else SaveMode.Append)
+      .mode(if (firstCommit) SaveMode.Overwrite else SaveMode.Append)
       .save(getBasePath)
   }
 
@@ -120,9 +128,46 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
   override def assertRecordsEqual(schema: Schema, expected: InternalRow, 
actual: InternalRow): Unit = {
     assertEquals(expected.numFields, actual.numFields)
     val expectedStruct = 
sparkAdapter.getAvroSchemaConverters.toSqlType(schema)._1.asInstanceOf[StructType]
-    expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).foreach( 
converted => {
-      assertEquals(converted._1, converted._2)
-    })
+
+    
expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).zipWithIndex.foreach
 {
+      case ((v1, v2), i) =>
+        val fieldType = expectedStruct(i).dataType
+
+        (v1, v2, fieldType) match {
+          case (a1: Array[Byte], a2: Array[Byte], _) =>
+            assert(java.util.Arrays.equals(a1, a2), s"Mismatch at field $i: 
expected ${a1.mkString(",")} but got ${a2.mkString(",")}")
+
+          case (m1: MapData, m2: MapData, MapType(keyType, valueType, _)) =>
+            val map1 = mapDataToScalaMap(m1, keyType, valueType)
+            val map2 = mapDataToScalaMap(m2, keyType, valueType)
+            assertEquals(map1, map2, s"Mismatch at field $i: maps not equal")
+
+          case _ =>
+            assertEquals(v1, v2, s"Mismatch at field $i")
+        }
+    }
+  }
+
+  def mapDataToScalaMap(mapData: MapData, keyType: DataType, valueType: 
DataType): Map[Any, Any] = {
+    val keys = mapData.keyArray()
+    val values = mapData.valueArray()
+    (0 until mapData.numElements()).map { i =>
+      val k = extractValue(keys, i, keyType)
+      val v = extractValue(values, i, valueType)
+      k -> v
+    }.toMap
+  }
+
+  def extractValue(array: ArrayData, index: Int, dt: DataType): Any = dt match 
{
+    case IntegerType => array.getInt(index)
+    case LongType    => array.getLong(index)
+    case StringType  => array.getUTF8String(index).toString
+    case DoubleType  => array.getDouble(index)
+    case FloatType   => array.getFloat(index)
+    case BooleanType => array.getBoolean(index)
+    case BinaryType  => array.getBinary(index)
+    // Extend this to support StructType, ArrayType, etc. if needed
+    case other       => throw new UnsupportedOperationException(s"Unsupported 
type: $other")
   }
 
   @Test
@@ -142,7 +187,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     testGetOrderingValue(
       sparkReaderContext, row, avroSchema, "col3", 
UTF8String.fromString("blue"))
     testGetOrderingValue(
-      sparkReaderContext, row, avroSchema, "non_existent_col", 
DEFAULT_ORDERING_VALUE)
+      sparkReaderContext, row, avroSchema, "non_existent_col", 
HoodieRecord.DEFAULT_ORDERING_VALUE)
   }
 
   val expectedEventTimeBased: Seq[(Int, String, String, String, Double, 
String)] = Seq(
@@ -342,6 +387,48 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     ))
     schema
   }
+
+  override def assertRecordMatchesSchema(schema: Schema, record: InternalRow): 
Unit = {
+    val structType = HoodieInternalRowUtils.getCachedSchema(schema)
+    assertRecordMatchesSchema(structType, record)
+  }
+
+  private def assertRecordMatchesSchema(structType: StructType, record: 
InternalRow): Unit = {
+    val values = record.toSeq(structType)
+    structType.zip(values).foreach { r =>
+      r._1.dataType match {
+        case struct: StructType => assertRecordMatchesSchema(struct, 
r._2.asInstanceOf[InternalRow])
+        case array: ArrayType => assertArrayMatchesSchema(array.elementType, 
r._2.asInstanceOf[ArrayData])
+        case map: MapType => asserMapMatchesSchema(map, 
r._2.asInstanceOf[MapData])
+        case _ =>
+      }
+    }
+  }
+
+  private def assertArrayMatchesSchema(schema: DataType, array: ArrayData): 
Unit = {
+    val arrayValues = array.toSeq[Any](schema)
+    schema match {
+      case structType: StructType =>
+        arrayValues.foreach(v => assertRecordMatchesSchema(structType, 
v.asInstanceOf[InternalRow]))
+      case arrayType: ArrayType =>
+        arrayValues.foreach(v => 
assertArrayMatchesSchema(arrayType.elementType, v.asInstanceOf[ArrayData]))
+      case mapType: MapType =>
+        arrayValues.foreach(v => asserMapMatchesSchema(mapType, 
v.asInstanceOf[MapData]))
+      case _ =>
+    }
+  }
+
+  private def asserMapMatchesSchema(schema: MapType, map: MapData): Unit = {
+    assertArrayMatchesSchema(schema.keyType, map.keyArray())
+    assertArrayMatchesSchema(schema.valueType, map.valueArray())
+  }
+
+  override def getSchemaEvolutionConfigs: 
HoodieTestDataGenerator.SchemaEvolutionConfigs = {
+    val configs = new HoodieTestDataGenerator.SchemaEvolutionConfigs()
+    configs.floatToDoubleSupport = false
+    configs
+  }
+
 }
 
 object TestHoodieFileGroupReaderOnSpark {


Reply via email to