This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f25f79c27c27043bbc1a9ee53c245cec7847a128 Author: Raymond Xu <xu.shiyan.raym...@gmail.com> AuthorDate: Thu Apr 21 18:14:43 2022 +0800 schema evolution patch apply from https://patch-diff.githubusercontent.com/raw/apache/hudi/pull/5376.patch --- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 12 +++- .../table/action/commit/HoodieMergeHelper.java | 12 +++- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 74 +++++++++++++++++++--- .../table/log/AbstractHoodieLogRecordReader.java | 3 +- .../schema/action/InternalSchemaMerger.java | 26 +++++++- .../internal/schema/utils/InternalSchemaUtils.java | 16 +++++ .../schema/utils/TestAvroSchemaEvolutionUtils.java | 4 +- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 31 ++++----- 8 files changed, 140 insertions(+), 38 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 89babc7725..5d5760961a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -46,6 +46,9 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.HashMap; + +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; /** * Base class for all write operations logically performed at the file group level. @@ -98,6 +101,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> protected final String fileId; protected final String writeToken; protected final TaskContextSupplier taskContextSupplier; + // For full schema evolution + protected final boolean schemaOnReadEnabled; public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) { @@ -120,6 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); this.taskContextSupplier = taskContextSupplier; this.writeToken = makeWriteToken(); + schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema()); } /** @@ -224,11 +230,13 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields. */ protected GenericRecord rewriteRecord(GenericRecord record) { - return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields); + return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>()) + : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields); } protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) { - return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName); + return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName) + : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName); } public abstract List<WriteStatus> close(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 578cdf0bc7..e964cfc9b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -36,6 +36,7 @@ import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -52,6 +53,8 @@ import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Collectors; public class HoodieMergeHelper<T extends HoodieRecordPayload> extends @@ -93,6 +96,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema()); boolean needToReWriteRecord = false; + Map<String, String> renameCols = new HashMap<>(); // TODO support bootstrap if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { // check implicitly add columns, and position reorder(spark sql may change cols order) @@ -109,10 +113,14 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends && writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f) && writeInternalSchema.findIdByName(f) != -1 && writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList()); - readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName()); + readSchema = AvroInternalSchemaConverter + .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName()); Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName()); needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() || SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, readSchema).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; + if (needToReWriteRecord) { + renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema); + } } try { @@ -121,7 +129,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { if (needToReWriteRecord) { - readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema); + readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols); } else { readerIterator = reader.getRecordIterator(readSchema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index bf540a302e..41be0b00c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -70,6 +70,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Deque; +import java.util.LinkedList; import java.util.TimeZone; import java.util.stream.Collectors; @@ -405,6 +407,18 @@ public class HoodieAvroUtils { return newRecord; } + // TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function. + public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) { + GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>()); + // do not preserve FILENAME_METADATA_FIELD + newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName); + if (!GenericData.get().validate(newSchema, newRecord)) { + throw new SchemaCompatibilityException( + "Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema); + } + return newRecord; + } + /** * Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the * provided {@code newSchema}. @@ -719,14 +733,28 @@ public class HoodieAvroUtils { * * @param oldRecord oldRecord to be rewritten * @param newSchema newSchema used to rewrite oldRecord + * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) * @return newRecord for new Schema */ - public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema) { - Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema); + public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) { + Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>()); return (GenericData.Record) newRecord; } - private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema) { + /** + * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema. + * support deep rewrite for nested record and adjust rename operation. + * This particular method does the following things : + * a) Create a new empty GenericRecord with the new schema. + * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema + * + * @param oldRecord oldRecord to be rewritten + * @param newSchema newSchema used to rewrite oldRecord + * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) + * @param fieldNames track the full name of visited field when we travel new schema. + * @return newRecord for new Schema + */ + private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) { if (oldRecord == null) { return null; } @@ -741,10 +769,23 @@ public class HoodieAvroUtils { for (int i = 0; i < fields.size(); i++) { Schema.Field field = fields.get(i); + String fieldName = field.name(); + fieldNames.push(fieldName); if (oldSchema.getField(field.name()) != null) { Schema.Field oldField = oldSchema.getField(field.name()); - helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema())); + helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); + } else { + String fieldFullName = createFullName(fieldNames); + String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\."); + String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1]; + // deal with rename + if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) { + // find rename + Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema); + helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); + } } + fieldNames.pop(); } GenericData.Record newRecord = new GenericData.Record(newSchema); for (int i = 0; i < fields.size(); i++) { @@ -765,9 +806,11 @@ public class HoodieAvroUtils { } Collection array = (Collection)oldRecord; List<Object> newArray = new ArrayList(); + fieldNames.push("element"); for (Object element : array) { - newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType())); + newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames)); } + fieldNames.pop(); return newArray; case MAP: if (!(oldRecord instanceof Map)) { @@ -775,17 +818,29 @@ public class HoodieAvroUtils { } Map<Object, Object> map = (Map<Object, Object>) oldRecord; Map<Object, Object> newMap = new HashMap<>(); + fieldNames.push("value"); for (Map.Entry<Object, Object> entry : map.entrySet()) { - newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType())); + newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames)); } + fieldNames.pop(); return newMap; case UNION: - return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord)); + return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames); default: return rewritePrimaryType(oldRecord, oldSchema, newSchema); } } + private static String createFullName(Deque<String> fieldNames) { + String result = ""; + if (!fieldNames.isEmpty()) { + List<String> parentNames = new ArrayList<>(); + fieldNames.descendingIterator().forEachRemaining(parentNames::add); + result = parentNames.stream().collect(Collectors.joining(".")); + } + return result; + } + private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) { Schema realOldSchema = oldSchema; if (realOldSchema.getType() == UNION) { @@ -958,9 +1013,10 @@ public class HoodieAvroUtils { * * @param oldRecords oldRecords to be rewrite * @param newSchema newSchema used to rewrite oldRecord + * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) * @return a iterator of rewrote GeneriRcords */ - public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema) { + public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols) { if (oldRecords == null || newSchema == null) { return Collections.emptyIterator(); } @@ -972,7 +1028,7 @@ public class HoodieAvroUtils { @Override public GenericRecord next() { - return rewriteRecordWithNewSchema(oldRecords.next(), newSchema); + return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, renameCols); } }; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 9e56083b26..9687136444 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -379,7 +380,7 @@ public abstract class AbstractHoodieLogRecordReader { Option<Schema> schemaOption = getMergedSchema(dataBlock); while (recordIterator.hasNext()) { IndexedRecord currentRecord = recordIterator.next(); - IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord; + IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord; processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN, this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName)); totalLogRecords.incrementAndGet(); diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java index 0d93ab170b..bcea9b957b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java @@ -48,6 +48,25 @@ public class InternalSchemaMerger { // we can pass decimalType to reWriteRecordWithNewSchema directly, everything is ok. private boolean useColumnTypeFromFileSchema = true; + // deal with rename + // Whether to use column name from file schema to read files when we find some column name has changed. + // spark parquetReader need the original column name to read data, otherwise the parquetReader will read nothing. + // eg: current column name is colOldName, now we rename it to colNewName, + // we should not pass colNewName to parquetReader, we must pass colOldName to it; when we read out the data. + // for log reader + // since our reWriteRecordWithNewSchema function support rewrite directly, so we no need this parameter + // eg: current column name is colOldName, now we rename it to colNewName, + // we can pass colNewName to reWriteRecordWithNewSchema directly, everything is ok. + private boolean useColNameFromFileSchema = true; + + public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema, boolean useColNameFromFileSchema) { + this.fileSchema = fileSchema; + this.querySchema = querySchema; + this.ignoreRequiredAttribute = ignoreRequiredAttribute; + this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema; + this.useColNameFromFileSchema = useColNameFromFileSchema; + } + public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) { this.fileSchema = fileSchema; this.querySchema = querySchema; @@ -131,12 +150,15 @@ public class InternalSchemaMerger { private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldField) { Types.Field fieldFromFileSchema = fileSchema.findField(fieldId); String nameFromFileSchema = fieldFromFileSchema.name(); + String nameFromQuerySchema = querySchema.findField(fieldId).name(); Type typeFromFileSchema = fieldFromFileSchema.type(); // Current design mechanism guarantees nestedType change is not allowed, so no need to consider. if (newType.isNestedType()) { - return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, newType, oldField.doc()); + return Types.Field.get(oldField.fieldId(), oldField.isOptional(), + useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc()); } else { - return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc()); + return Types.Field.get(oldField.fieldId(), oldField.isOptional(), + useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java index 3c0877f6f5..a784b409b8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java @@ -267,4 +267,20 @@ public class InternalSchemaUtils { } return result; } + + /** + * Try to find all renamed cols between oldSchema and newSchema. + * + * @param oldSchema oldSchema + * @param newSchema newSchema which modified from oldSchema + * @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema) + */ + public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) { + List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName(); + return colNamesFromWriteSchema.stream().filter(f -> { + int filedIdFromWriteSchema = oldSchema.findIdByName(f); + // try to find the cols which has the same id, but have different colName; + return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f); + }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e)); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java index d116697b8d..3850ef07b9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java @@ -284,7 +284,7 @@ public class TestAvroSchemaEvolutionUtils { .updateColumnType("col6", Types.StringType.get()); InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange); Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName()); - GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema); + GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>()); Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newRecord), true); } @@ -349,7 +349,7 @@ public class TestAvroSchemaEvolutionUtils { ); Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName()); - GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema); + GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>()); // test the correctly of rewrite Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newAvroRecord), true); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index ae828ed9f7..5416363598 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -445,28 +445,19 @@ class TestSpark3DDL extends TestHoodieSqlBase { Seq(null), Seq(Map("t1" -> 10.0d)) ) + spark.sql(s"alter table ${tableName} rename column members to mem") + spark.sql(s"alter table ${tableName} rename column mem.value.n to nn") + spark.sql(s"alter table ${tableName} rename column userx to us") + spark.sql(s"alter table ${tableName} rename column us.age to age1") + + spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") + spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show() + checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())( + Seq(null, 29), + Seq(null, 291) + ) } } } } - - private def performClustering(writeDf: DataFrame, basePath: String, tableName: String, tableType: String): Unit = { - writeDf.write.format("org.apache.hudi") - .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) - .option("hoodie.upsert.shuffle.parallelism", "1") - .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "comb") - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "par") - .option(HoodieWriteConfig.TBL_NAME.key, tableName) - .option("hoodie.schema.on.read.enable", "true") - // option for clustering - .option("hoodie.clustering.inline", "true") - .option("hoodie.clustering.inline.max.commits", "1") - .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(2*1024*1024L)) - .option("hoodie.clustering.plan.strategy.max.bytes.per.group", String.valueOf(10*1024*1024L)) - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(4 * 1024* 1024L)) - .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "col1, col2") - .mode(SaveMode.Append) - .save(basePath) - } }