This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f25f79c27c27043bbc1a9ee53c245cec7847a128
Author: Raymond Xu <xu.shiyan.raym...@gmail.com>
AuthorDate: Thu Apr 21 18:14:43 2022 +0800

    schema evolution patch
    
    apply from 
https://patch-diff.githubusercontent.com/raw/apache/hudi/pull/5376.patch
---
 .../java/org/apache/hudi/io/HoodieWriteHandle.java | 12 +++-
 .../table/action/commit/HoodieMergeHelper.java     | 12 +++-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 74 +++++++++++++++++++---
 .../table/log/AbstractHoodieLogRecordReader.java   |  3 +-
 .../schema/action/InternalSchemaMerger.java        | 26 +++++++-
 .../internal/schema/utils/InternalSchemaUtils.java | 16 +++++
 .../schema/utils/TestAvroSchemaEvolutionUtils.java |  4 +-
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 31 ++++-----
 8 files changed, 140 insertions(+), 38 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 89babc7725..5d5760961a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -46,6 +46,9 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.HashMap;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 
 /**
  * Base class for all write operations logically performed at the file group 
level.
@@ -98,6 +101,8 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload, I, K, O>
   protected final String fileId;
   protected final String writeToken;
   protected final TaskContextSupplier taskContextSupplier;
+  // For full schema evolution
+  protected final boolean schemaOnReadEnabled;
 
   public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
                            String fileId, HoodieTable<T, I, K, O> hoodieTable, 
TaskContextSupplier taskContextSupplier) {
@@ -120,6 +125,7 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload, I, K, O>
         !hoodieTable.getIndex().isImplicitWithStorage(), 
config.getWriteStatusFailureFraction());
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
+    schemaOnReadEnabled = 
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
   }
 
   /**
@@ -224,11 +230,13 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload, I, K, O>
    * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata 
fields.
    */
   protected GenericRecord rewriteRecord(GenericRecord record) {
-    return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
+    return schemaOnReadEnabled ? 
HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, 
new HashMap<>())
+        : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
   }
 
   protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, 
String fileName) {
-    return HoodieAvroUtils.rewriteRecordWithMetadata(record, 
writeSchemaWithMetaFields, fileName);
+    return schemaOnReadEnabled ? 
HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, 
writeSchemaWithMetaFields, fileName)
+        : HoodieAvroUtils.rewriteRecordWithMetadata(record, 
writeSchemaWithMetaFields, fileName);
   }
 
   public abstract List<WriteStatus> close();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 578cdf0bc7..e964cfc9b3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -36,6 +36,7 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -52,6 +53,8 @@ import org.apache.hadoop.conf.Configuration;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
@@ -93,6 +96,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> 
extends
 
     Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
     boolean needToReWriteRecord = false;
+    Map<String, String> renameCols = new HashMap<>();
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
@@ -109,10 +113,14 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends
                       && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
                       && writeInternalSchema.findIdByName(f) != -1
                       && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter.convert(new 
InternalSchemaMerger(writeInternalSchema, querySchema, true, 
false).mergeSchema(), readSchema.getName());
+      readSchema = AvroInternalSchemaConverter
+          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
       Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
       needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
               || 
SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, 
readSchema).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+      if (needToReWriteRecord) {
+        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
+      }
     }
 
     try {
@@ -121,7 +129,7 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends
         readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
       } else {
         if (needToReWriteRecord) {
-          readerIterator = 
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), 
readSchema);
+          readerIterator = 
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), 
readSchema, renameCols);
         } else {
           readerIterator = reader.getRecordIterator(readSchema);
         }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index bf540a302e..41be0b00c0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -70,6 +70,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
@@ -405,6 +407,18 @@ public class HoodieAvroUtils {
     return newRecord;
   }
 
+  // TODO Unify the logical of rewriteRecordWithMetadata and 
rewriteEvolutionRecordWithMetadata, and delete this function.
+  public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord 
genericRecord, Schema newSchema, String fileName) {
+    GenericRecord newRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new 
HashMap<>());
+    // do not preserve FILENAME_METADATA_FIELD
+    newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
+    if (!GenericData.get().validate(newSchema, newRecord)) {
+      throw new SchemaCompatibilityException(
+          "Unable to validate the rewritten record " + genericRecord + " 
against schema " + newSchema);
+    }
+    return newRecord;
+  }
+
   /**
    * Converts list of {@link GenericRecord} provided into the {@link 
GenericRecord} adhering to the
    * provided {@code newSchema}.
@@ -719,14 +733,28 @@ public class HoodieAvroUtils {
    *
    * @param oldRecord oldRecord to be rewritten
    * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> 
(colNameFromNewSchema, colNameFromOldSchema)
    * @return newRecord for new Schema
    */
-  public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord 
oldRecord, Schema newSchema) {
-    Object newRecord = rewriteRecordWithNewSchema(oldRecord, 
oldRecord.getSchema(), newSchema);
+  public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord 
oldRecord, Schema newSchema, Map<String, String> renameCols) {
+    Object newRecord = rewriteRecordWithNewSchema(oldRecord, 
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
     return (GenericData.Record) newRecord;
   }
 
-  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema 
oldSchema, Schema newSchema) {
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record and adjust rename operation.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewritten
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> 
(colNameFromNewSchema, colNameFromOldSchema)
+   * @param fieldNames track the full name of visited field when we travel new 
schema.
+   * @return newRecord for new Schema
+   */
+  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema 
oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> 
fieldNames) {
     if (oldRecord == null) {
       return null;
     }
@@ -741,10 +769,23 @@ public class HoodieAvroUtils {
 
         for (int i = 0; i < fields.size(); i++) {
           Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+          } else {
+            String fieldFullName = createFullName(fieldNames);
+            String[] colNamePartsFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "").split("\\.");
+            String lastColNameFromOldSchema = 
colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
+            // deal with rename
+            if (oldSchema.getField(field.name()) == null && 
oldSchema.getField(lastColNameFromOldSchema) != null) {
+              // find rename
+              Schema.Field oldField = 
oldSchema.getField(lastColNameFromOldSchema);
+              helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+            }
           }
+          fieldNames.pop();
         }
         GenericData.Record newRecord = new GenericData.Record(newSchema);
         for (int i = 0; i < fields.size(); i++) {
@@ -765,9 +806,11 @@ public class HoodieAvroUtils {
         }
         Collection array = (Collection)oldRecord;
         List<Object> newArray = new ArrayList();
+        fieldNames.push("element");
         for (Object element : array) {
-          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType()));
+          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, 
fieldNames));
         }
+        fieldNames.pop();
         return newArray;
       case MAP:
         if (!(oldRecord instanceof Map)) {
@@ -775,17 +818,29 @@ public class HoodieAvroUtils {
         }
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>();
+        fieldNames.push("value");
         for (Map.Entry<Object, Object> entry : map.entrySet()) {
-          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType()));
+          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType(), renameCols, fieldNames));
         }
+        fieldNames.pop();
         return newMap;
       case UNION:
-        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord));
+        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
       default:
         return rewritePrimaryType(oldRecord, oldSchema, newSchema);
     }
   }
 
+  private static String createFullName(Deque<String> fieldNames) {
+    String result = "";
+    if (!fieldNames.isEmpty()) {
+      List<String> parentNames = new ArrayList<>();
+      fieldNames.descendingIterator().forEachRemaining(parentNames::add);
+      result = parentNames.stream().collect(Collectors.joining("."));
+    }
+    return result;
+  }
+
   private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
     Schema realOldSchema = oldSchema;
     if (realOldSchema.getType() == UNION) {
@@ -958,9 +1013,10 @@ public class HoodieAvroUtils {
    *
    * @param oldRecords oldRecords to be rewrite
    * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> 
(colNameFromNewSchema, colNameFromOldSchema)
    * @return a iterator of rewrote GeneriRcords
    */
-  public static Iterator<GenericRecord> 
rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema 
newSchema) {
+  public static Iterator<GenericRecord> 
rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema 
newSchema, Map<String, String> renameCols) {
     if (oldRecords == null || newSchema == null) {
       return Collections.emptyIterator();
     }
@@ -972,7 +1028,7 @@ public class HoodieAvroUtils {
 
       @Override
       public GenericRecord next() {
-        return rewriteRecordWithNewSchema(oldRecords.next(), newSchema);
+        return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, 
renameCols);
       }
     };
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 9e56083b26..9687136444 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -58,6 +58,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -379,7 +380,7 @@ public abstract class AbstractHoodieLogRecordReader {
       Option<Schema> schemaOption = getMergedSchema(dataBlock);
       while (recordIterator.hasNext()) {
         IndexedRecord currentRecord = recordIterator.next();
-        IndexedRecord record = schemaOption.isPresent() ? 
HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : 
currentRecord;
+        IndexedRecord record = schemaOption.isPresent() ? 
HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), 
new HashMap<>()) : currentRecord;
         processNextRecord(createHoodieRecord(record, 
this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
             this.preCombineField, this.withOperationField, 
this.simpleKeyGenFields, this.partitionName));
         totalLogRecords.incrementAndGet();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
index 0d93ab170b..bcea9b957b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
@@ -48,6 +48,25 @@ public class InternalSchemaMerger {
   // we can pass decimalType to reWriteRecordWithNewSchema directly, 
everything is ok.
   private boolean useColumnTypeFromFileSchema = true;
 
+  // deal with rename
+  // Whether to use column name from file schema to read files when we find 
some column name has changed.
+  // spark parquetReader need the original column name to read data, otherwise 
the parquetReader will read nothing.
+  // eg: current column name is colOldName, now we rename it to colNewName,
+  // we should not pass colNewName to parquetReader, we must pass colOldName 
to it; when we read out the data.
+  // for log reader
+  // since our reWriteRecordWithNewSchema function support rewrite directly, 
so we no need this parameter
+  // eg: current column name is colOldName, now we rename it to colNewName,
+  // we can pass colNewName to reWriteRecordWithNewSchema directly, everything 
is ok.
+  private boolean useColNameFromFileSchema = true;
+
+  public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema 
querySchema, boolean ignoreRequiredAttribute, boolean 
useColumnTypeFromFileSchema, boolean useColNameFromFileSchema) {
+    this.fileSchema = fileSchema;
+    this.querySchema = querySchema;
+    this.ignoreRequiredAttribute = ignoreRequiredAttribute;
+    this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
+    this.useColNameFromFileSchema = useColNameFromFileSchema;
+  }
+
   public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema 
querySchema, boolean ignoreRequiredAttribute, boolean 
useColumnTypeFromFileSchema) {
     this.fileSchema = fileSchema;
     this.querySchema = querySchema;
@@ -131,12 +150,15 @@ public class InternalSchemaMerger {
   private Types.Field dealWithRename(int fieldId, Type newType, Types.Field 
oldField) {
     Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
     String nameFromFileSchema = fieldFromFileSchema.name();
+    String nameFromQuerySchema = querySchema.findField(fieldId).name();
     Type typeFromFileSchema = fieldFromFileSchema.type();
     // Current design mechanism guarantees nestedType change is not allowed, 
so no need to consider.
     if (newType.isNestedType()) {
-      return Types.Field.get(oldField.fieldId(), oldField.isOptional(), 
nameFromFileSchema, newType, oldField.doc());
+      return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
+          useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, 
newType, oldField.doc());
     } else {
-      return Types.Field.get(oldField.fieldId(), oldField.isOptional(), 
nameFromFileSchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, 
oldField.doc());
+      return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
+          useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, 
useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
index 3c0877f6f5..a784b409b8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
@@ -267,4 +267,20 @@ public class InternalSchemaUtils {
     }
     return result;
   }
+
+  /**
+   * Try to find all renamed cols between oldSchema and newSchema.
+   *
+   * @param oldSchema oldSchema
+   * @param newSchema newSchema which modified from oldSchema
+   * @return renameCols Map. (k, v) -> (colNameFromNewSchema, 
colNameFromOldSchema)
+   */
+  public static Map<String, String> collectRenameCols(InternalSchema 
oldSchema, InternalSchema newSchema) {
+    List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
+    return colNamesFromWriteSchema.stream().filter(f -> {
+      int filedIdFromWriteSchema = oldSchema.findIdByName(f);
+      // try to find the cols which has the same id, but have different 
colName;
+      return newSchema.getAllIds().contains(filedIdFromWriteSchema) && 
!newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
+    }).collect(Collectors.toMap(e -> 
newSchema.findfullName(oldSchema.findIdByName(e)), e -> e));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
index d116697b8d..3850ef07b9 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
@@ -284,7 +284,7 @@ public class TestAvroSchemaEvolutionUtils {
         .updateColumnType("col6", Types.StringType.get());
     InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
     Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, 
avroSchema.getName());
-    GenericRecord newRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+    GenericRecord newRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new 
HashMap<>());
 
     Assertions.assertEquals(GenericData.get().validate(newAvroSchema, 
newRecord), true);
   }
@@ -349,7 +349,7 @@ public class TestAvroSchemaEvolutionUtils {
     );
 
     Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, 
schema.getName());
-    GenericRecord newAvroRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+    GenericRecord newAvroRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new 
HashMap<>());
     // test the correctly of rewrite
     Assertions.assertEquals(GenericData.get().validate(newAvroSchema, 
newAvroRecord), true);
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index ae828ed9f7..5416363598 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -445,28 +445,19 @@ class TestSpark3DDL extends TestHoodieSqlBase {
             Seq(null),
             Seq(Map("t1" -> 10.0d))
           )
+          spark.sql(s"alter table ${tableName} rename column members to mem")
+          spark.sql(s"alter table ${tableName} rename column mem.value.n to 
nn")
+          spark.sql(s"alter table ${tableName} rename column userx to us")
+          spark.sql(s"alter table ${tableName} rename column us.age to age1")
+
+          spark.sql(s"insert into ${tableName} values(2 , map('k1', 
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 
101), 'jacknew', 1000, map('t1', 10))")
+          spark.sql(s"select mem.value.nn, us.age1 from $tableName order by 
id").show()
+          checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName 
order by id").collect())(
+            Seq(null, 29),
+            Seq(null, 291)
+          )
         }
       }
     }
   }
-
-  private def performClustering(writeDf: DataFrame, basePath: String, 
tableName: String, tableType: String): Unit = {
-    writeDf.write.format("org.apache.hudi")
-      .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
-      .option("hoodie.upsert.shuffle.parallelism", "1")
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "comb")
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "par")
-      .option(HoodieWriteConfig.TBL_NAME.key, tableName)
-      .option("hoodie.schema.on.read.enable", "true")
-      // option for clustering
-      .option("hoodie.clustering.inline", "true")
-      .option("hoodie.clustering.inline.max.commits", "1")
-      .option("hoodie.clustering.plan.strategy.small.file.limit", 
String.valueOf(2*1024*1024L))
-      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", 
String.valueOf(10*1024*1024L))
-      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
String.valueOf(4 * 1024* 1024L))
-      .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "col1, 
col2")
-      .mode(SaveMode.Append)
-      .save(basePath)
-  }
 }

Reply via email to