This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5bbd47f22214dc1c911688ebb04f75b8e9d7cbcb Author: xiarixiaoyao <mengtao0...@qq.com> AuthorDate: Wed Apr 20 20:29:54 2022 +0800 [HUDI-3921] Fixed schema evolution cannot work with HUDI-3855 --- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 12 +++++- .../table/action/commit/HoodieMergeHelper.java | 12 +++++- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 46 +++++++++++++++++----- .../table/log/AbstractHoodieLogRecordReader.java | 3 +- .../schema/action/InternalSchemaMerger.java | 26 +++++++++++- .../internal/schema/utils/InternalSchemaUtils.java | 16 ++++++++ .../schema/utils/TestAvroSchemaEvolutionUtils.java | 4 +- 7 files changed, 101 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 89babc7725..ab8a3d7033 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -46,6 +46,9 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.HashMap; + +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; /** * Base class for all write operations logically performed at the file group level. @@ -98,6 +101,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> protected final String fileId; protected final String writeToken; protected final TaskContextSupplier taskContextSupplier; + // For full schema evolution + protected final boolean schemaOnReadEnable; public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) { @@ -120,6 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); this.taskContextSupplier = taskContextSupplier; this.writeToken = makeWriteToken(); + schemaOnReadEnable = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema()); } /** @@ -224,11 +230,13 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O> * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields. */ protected GenericRecord rewriteRecord(GenericRecord record) { - return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields); + return schemaOnReadEnable ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>()) + : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields); } protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) { - return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName); + return schemaOnReadEnable ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName) + : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName); } public abstract List<WriteStatus> close(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 578cdf0bc7..e964cfc9b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -36,6 +36,7 @@ import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -52,6 +53,8 @@ import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Collectors; public class HoodieMergeHelper<T extends HoodieRecordPayload> extends @@ -93,6 +96,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema()); boolean needToReWriteRecord = false; + Map<String, String> renameCols = new HashMap<>(); // TODO support bootstrap if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { // check implicitly add columns, and position reorder(spark sql may change cols order) @@ -109,10 +113,14 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends && writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f) && writeInternalSchema.findIdByName(f) != -1 && writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList()); - readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName()); + readSchema = AvroInternalSchemaConverter + .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName()); Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName()); needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() || SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, readSchema).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; + if (needToReWriteRecord) { + renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema); + } } try { @@ -121,7 +129,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { if (needToReWriteRecord) { - readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema); + readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols); } else { readerIterator = reader.getRecordIterator(readSchema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index edfecec515..37d84a2895 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -65,6 +65,7 @@ import java.sql.Date; import java.sql.Timestamp; import java.time.LocalDate; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -406,6 +407,18 @@ public class HoodieAvroUtils { return newRecord; } + // TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function. + public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) { + GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>()); + // do not preserve FILENAME_METADATA_FIELD + newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName); + if (!GenericData.get().validate(newSchema, newRecord)) { + throw new SchemaCompatibilityException( + "Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema); + } + return newRecord; + } + /** * Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the * provided {@code newSchema}. @@ -731,14 +744,15 @@ public class HoodieAvroUtils { * * @param oldRecord oldRecord to be rewritten * @param newSchema newSchema used to rewrite oldRecord + * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) * @return newRecord for new Schema */ - public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema) { - Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema); + public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) { + Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols); return (GenericData.Record) newRecord; } - private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema) { + private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) { if (oldRecord == null) { return null; } @@ -755,7 +769,20 @@ public class HoodieAvroUtils { Schema.Field field = fields.get(i); if (oldSchema.getField(field.name()) != null) { Schema.Field oldField = oldSchema.getField(field.name()); - helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema())); + helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols)); + } + // deal with rename + if (!renameCols.isEmpty() && oldSchema.getField(field.name()) == null) { + String fieldName = field.name(); + for (Map.Entry<String, String> entry : renameCols.entrySet()) { + List<String> nameParts = Arrays.asList(entry.getKey().split("\\.")); + List<String> namePartsOld = Arrays.asList(entry.getValue().split("\\.")); + if (nameParts.get(nameParts.size() - 1).equals(fieldName) && oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1)) != null) { + // find rename + Schema.Field oldField = oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1)); + helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols)); + } + } } } GenericData.Record newRecord = new GenericData.Record(newSchema); @@ -778,7 +805,7 @@ public class HoodieAvroUtils { Collection array = (Collection)oldRecord; List<Object> newArray = new ArrayList(); for (Object element : array) { - newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType())); + newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols)); } return newArray; case MAP: @@ -788,11 +815,11 @@ public class HoodieAvroUtils { Map<Object, Object> map = (Map<Object, Object>) oldRecord; Map<Object, Object> newMap = new HashMap<>(); for (Map.Entry<Object, Object> entry : map.entrySet()) { - newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType())); + newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols)); } return newMap; case UNION: - return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord)); + return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols); default: return rewritePrimaryType(oldRecord, oldSchema, newSchema); } @@ -970,9 +997,10 @@ public class HoodieAvroUtils { * * @param oldRecords oldRecords to be rewrite * @param newSchema newSchema used to rewrite oldRecord + * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) * @return a iterator of rewrote GeneriRcords */ - public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema) { + public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols) { if (oldRecords == null || newSchema == null) { return Collections.emptyIterator(); } @@ -984,7 +1012,7 @@ public class HoodieAvroUtils { @Override public GenericRecord next() { - return rewriteRecordWithNewSchema(oldRecords.next(), newSchema); + return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, renameCols); } }; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 9e56083b26..9687136444 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -379,7 +380,7 @@ public abstract class AbstractHoodieLogRecordReader { Option<Schema> schemaOption = getMergedSchema(dataBlock); while (recordIterator.hasNext()) { IndexedRecord currentRecord = recordIterator.next(); - IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord; + IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord; processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN, this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName)); totalLogRecords.incrementAndGet(); diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java index 0d93ab170b..bcea9b957b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java @@ -48,6 +48,25 @@ public class InternalSchemaMerger { // we can pass decimalType to reWriteRecordWithNewSchema directly, everything is ok. private boolean useColumnTypeFromFileSchema = true; + // deal with rename + // Whether to use column name from file schema to read files when we find some column name has changed. + // spark parquetReader need the original column name to read data, otherwise the parquetReader will read nothing. + // eg: current column name is colOldName, now we rename it to colNewName, + // we should not pass colNewName to parquetReader, we must pass colOldName to it; when we read out the data. + // for log reader + // since our reWriteRecordWithNewSchema function support rewrite directly, so we no need this parameter + // eg: current column name is colOldName, now we rename it to colNewName, + // we can pass colNewName to reWriteRecordWithNewSchema directly, everything is ok. + private boolean useColNameFromFileSchema = true; + + public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema, boolean useColNameFromFileSchema) { + this.fileSchema = fileSchema; + this.querySchema = querySchema; + this.ignoreRequiredAttribute = ignoreRequiredAttribute; + this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema; + this.useColNameFromFileSchema = useColNameFromFileSchema; + } + public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) { this.fileSchema = fileSchema; this.querySchema = querySchema; @@ -131,12 +150,15 @@ public class InternalSchemaMerger { private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldField) { Types.Field fieldFromFileSchema = fileSchema.findField(fieldId); String nameFromFileSchema = fieldFromFileSchema.name(); + String nameFromQuerySchema = querySchema.findField(fieldId).name(); Type typeFromFileSchema = fieldFromFileSchema.type(); // Current design mechanism guarantees nestedType change is not allowed, so no need to consider. if (newType.isNestedType()) { - return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, newType, oldField.doc()); + return Types.Field.get(oldField.fieldId(), oldField.isOptional(), + useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc()); } else { - return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc()); + return Types.Field.get(oldField.fieldId(), oldField.isOptional(), + useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java index 3c0877f6f5..a784b409b8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java @@ -267,4 +267,20 @@ public class InternalSchemaUtils { } return result; } + + /** + * Try to find all renamed cols between oldSchema and newSchema. + * + * @param oldSchema oldSchema + * @param newSchema newSchema which modified from oldSchema + * @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema) + */ + public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) { + List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName(); + return colNamesFromWriteSchema.stream().filter(f -> { + int filedIdFromWriteSchema = oldSchema.findIdByName(f); + // try to find the cols which has the same id, but have different colName; + return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f); + }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e)); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java index d116697b8d..3850ef07b9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java @@ -284,7 +284,7 @@ public class TestAvroSchemaEvolutionUtils { .updateColumnType("col6", Types.StringType.get()); InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange); Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName()); - GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema); + GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>()); Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newRecord), true); } @@ -349,7 +349,7 @@ public class TestAvroSchemaEvolutionUtils { ); Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName()); - GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema); + GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>()); // test the correctly of rewrite Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newAvroRecord), true); }