This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9e7fba4f22ac [HUDI-9576] Test schema evolution in fg reader (#13549)
9e7fba4f22ac is described below
commit 9e7fba4f22ac9125d15a79c28c39ed3633febcd1
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jul 18 18:41:52 2025 -0400
[HUDI-9576] Test schema evolution in fg reader (#13549)
- Test schema evolution with File group reader across engines. For spark,
all cases are green. For other record formats, some tests are disabled, but
follow up patches will fix them.
---------
Co-authored-by: Jonathan Vexler <=>
---
.../read/HoodieFileGroupReaderOnJavaTestBase.java | 10 +-
.../read/TestHoodieFileGroupReaderOnJava.java | 15 +
.../hadoop/TestHoodieFileGroupReaderOnHive.java | 49 ++-
.../hudi/testutils/ArrayWritableTestUtil.java | 103 +++++++
.../table/read/TestHoodieFileGroupReaderBase.java | 334 ++++++++++++++++++++-
.../common/testutils/HoodieTestDataGenerator.java | 238 ++++++++++++++-
.../table/TestHoodieFileGroupReaderOnFlink.java | 52 +++-
.../hadoop/hive/serde2/avro/HiveTypeUtils.java | 328 ++++++++++++++++++++
.../hudi/hadoop/utils/ObjectInspectorCache.java | 4 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 119 +++++++-
10 files changed, 1194 insertions(+), 58 deletions(-)
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
index 6f66b6cb2d0f..fb72b328aa5a 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -60,13 +59,13 @@ public abstract class
HoodieFileGroupReaderOnJavaTestBase<T> extends TestHoodieF
}
@Override
- public void commitToTable(List<HoodieRecord> recordList, String operation,
Map<String, String> writeConfigs) {
+ public void commitToTable(List<HoodieRecord> recordList, String operation,
boolean firstCommit, Map<String, String> writeConfigs, String schemaStr) {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withEmbeddedTimelineServerEnabled(false)
.withProps(writeConfigs)
.withPath(getBasePath())
- .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withSchema(schemaStr)
.build();
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier
taskContextSupplier = new
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier();
@@ -75,8 +74,7 @@ public abstract class HoodieFileGroupReaderOnJavaTestBase<T>
extends TestHoodieF
StoragePath basePath = new StoragePath(getBasePath());
try (HoodieStorage storage = new HoodieHadoopStorage(basePath,
getStorageConf())) {
boolean basepathExists = storage.exists(basePath);
- boolean operationIsInsert = operation.equalsIgnoreCase("insert");
- if (!basepathExists || operationIsInsert) {
+ if (!basepathExists || firstCommit) {
if (basepathExists) {
storage.deleteDirectory(basePath);
}
@@ -108,6 +106,8 @@ public abstract class
HoodieFileGroupReaderOnJavaTestBase<T> extends TestHoodieF
recordList.forEach(hoodieRecord -> recordsCopy.add(new
HoodieAvroRecord<>(hoodieRecord.getKey(), (HoodieRecordPayload)
hoodieRecord.getData())));
if (operation.toLowerCase().equals("insert")) {
writeClient.commit(instantTime, writeClient.insert(recordsCopy,
instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
+ } else if (operation.toLowerCase().equals("bulkInsert")) {
+ writeClient.commit(instantTime, writeClient.bulkInsert(recordsCopy,
instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
} else {
writeClient.commit(instantTime, writeClient.upsert(recordsCopy,
instantTime), Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
index 0f3e1ad59329..bdcf4f54009f 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java
@@ -19,9 +19,11 @@
package org.apache.hudi.common.table.read;
+import org.apache.hudi.avro.ConvertingGenericData;
import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
@@ -30,6 +32,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieFileGroupReaderOnJava extends
HoodieFileGroupReaderOnJavaTestBase<IndexedRecord> {
private static final StorageConfiguration<?> STORAGE_CONFIGURATION = new
HadoopStorageConfiguration(false);
@@ -49,4 +52,16 @@ public class TestHoodieFileGroupReaderOnJava extends
HoodieFileGroupReaderOnJava
public void assertRecordsEqual(Schema schema, IndexedRecord expected,
IndexedRecord actual) {
assertEquals(expected, actual);
}
+
+ @Override
+ public void assertRecordMatchesSchema(Schema schema, IndexedRecord record) {
+ assertTrue(ConvertingGenericData.INSTANCE.validate(schema, record));
+ }
+
+ @Override
+ public HoodieTestDataGenerator.SchemaEvolutionConfigs
getSchemaEvolutionConfigs() {
+ HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new
HoodieTestDataGenerator.SchemaEvolutionConfigs();
+ configs.addNewFieldSupport = false;
+ return configs;
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index c01ed5a7a33e..5206a3fa1d1d 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -38,8 +38,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.HiveTypeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapred.JobConf;
import org.junit.jupiter.api.AfterAll;
@@ -47,7 +48,7 @@ import org.junit.jupiter.api.BeforeAll;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
+import java.util.Locale;
import java.util.stream.Collectors;
import static
org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader.getStoredPartitionFieldNames;
@@ -105,23 +106,53 @@ public class TestHoodieFileGroupReaderOnHive extends
HoodieFileGroupReaderOnJava
ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual,
false);
}
+ @Override
+ public void assertRecordMatchesSchema(Schema schema, ArrayWritable record) {
+ ArrayWritableTestUtil.assertArrayWritableMatchesSchema(schema, record);
+ }
+
+ @Override
+ public HoodieTestDataGenerator.SchemaEvolutionConfigs
getSchemaEvolutionConfigs() {
+ HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new
HoodieTestDataGenerator.SchemaEvolutionConfigs();
+ configs.nestedSupport = false;
+ configs.arraySupport = false;
+ configs.mapSupport = false;
+ configs.addNewFieldSupport = false;
+ configs.intToLongSupport = false;
+ configs.intToFloatSupport = false;
+ configs.intToDoubleSupport = false;
+ configs.intToStringSupport = false;
+ configs.longToFloatSupport = false;
+ configs.longToDoubleSupport = false;
+ configs.longToStringSupport = false;
+ configs.floatToDoubleSupport = false;
+ configs.floatToStringSupport = false;
+ configs.doubleToStringSupport = false;
+ configs.stringToBytesSupport = false;
+ configs.bytesToStringSupport = false;
+ return configs;
+ }
+
private void setupJobconf(JobConf jobConf, Schema schema) {
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
- List<TypeInfo> types =
TypeInfoUtils.getTypeInfosFromTypeString(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
- Map<String, String> typeMappings =
HoodieTestDataGenerator.AVRO_SCHEMA.getFields().stream().collect(Collectors.toMap(Schema.Field::name,
field -> types.get(field.pos()).getTypeName()));
- String columnTypes = fields.stream().map(field ->
typeMappings.getOrDefault(field.name(),
"string")).collect(Collectors.joining(","));
- jobConf.set("columns.types", columnTypes + ",string");
+ try {
+ String columnTypes =
HiveTypeUtils.generateColumnTypes(schema).stream().map(TypeInfo::getTypeName).collect(Collectors.joining(","));
+ jobConf.set("columns.types", columnTypes + ",string");
+ } catch (SerDeException e) {
+ throw new RuntimeException(e);
+ }
}
private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf
jobConf, boolean isPartitioned) {
- String names =
fields.stream().map(Schema.Field::name).collect(Collectors.joining(","));
+ String names = fields.stream().map(Schema.Field::name).map(s ->
s.toLowerCase(Locale.ROOT)).collect(Collectors.joining(","));
String positions = fields.stream().map(f ->
String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
- String hiveOrderedColumnNames = fields.stream().filter(field ->
!field.name().equalsIgnoreCase(PARTITION_COLUMN))
- .map(Schema.Field::name).collect(Collectors.joining(","));
+ String hiveOrderedColumnNames = fields.stream().map(Schema.Field::name)
+ .filter(name -> !name.equalsIgnoreCase(PARTITION_COLUMN))
+ .map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.joining(","));
if (isPartitioned) {
hiveOrderedColumnNames += "," + PARTITION_COLUMN;
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
PARTITION_COLUMN);
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
index bbca26711279..d32da6179661 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/ArrayWritableTestUtil.java
@@ -191,4 +191,107 @@ public class ArrayWritableTestUtil {
assertEquals(expected, actual);
}
}
+
+ public static void assertArrayWritableMatchesSchema(Schema schema, Writable
writable) {
+ switch (schema.getType()) {
+ case RECORD: {
+ assertInstanceOf(ArrayWritable.class, writable);
+ ArrayWritable arrayWritable = (ArrayWritable) writable;
+ assertEquals(schema.getFields().size(), arrayWritable.get().length);
+ for (Schema.Field field : schema.getFields()) {
+ assertArrayWritableMatchesSchema(field.schema(),
arrayWritable.get()[field.pos()]);
+ }
+ break;
+ }
+ case ARRAY: {
+ assertInstanceOf(ArrayWritable.class, writable);
+ ArrayWritable arrayWritable = (ArrayWritable) writable;
+ for (int i = 0; i < arrayWritable.get().length; i++) {
+ assertArrayWritableMatchesSchema(schema.getElementType(),
arrayWritable.get()[i]);
+ }
+ break;
+ }
+ case MAP: {
+ assertInstanceOf(ArrayWritable.class, writable);
+ ArrayWritable arrayWritable = (ArrayWritable) writable;
+ for (int i = 0; i < arrayWritable.get().length; i++) {
+ Writable expectedKV = arrayWritable.get()[i];
+ assertInstanceOf(ArrayWritable.class, expectedKV);
+ ArrayWritable kv = (ArrayWritable) expectedKV;
+ assertEquals(2, kv.get().length);
+ assertNotNull(kv.get()[0]);
+ assertArrayWritableMatchesSchema(schema.getValueType(), kv.get()[1]);
+ }
+ break;
+ }
+ case UNION:
+ if (schema.getTypes().size() == 2
+ && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+ assertArrayWritableMatchesSchema(schema.getTypes().get(1), writable);
+ } else if (schema.getTypes().size() == 2
+ && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+ assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
+ } else if (schema.getTypes().size() == 1) {
+ assertArrayWritableMatchesSchema(schema.getTypes().get(0), writable);
+ } else {
+ throw new IllegalStateException("Union has more than 2 types or one
type is not null: " + schema);
+ }
+ break;
+
+ default:
+ assertWritablePrimaryTypeMatchesSchema(schema, writable);
+ }
+ }
+
+ private static void assertWritablePrimaryTypeMatchesSchema(Schema schema,
Writable writable) {
+ switch (schema.getType()) {
+ case NULL:
+ assertInstanceOf(NullWritable.class, writable);
+ break;
+
+ case BOOLEAN:
+ assertInstanceOf(BooleanWritable.class, writable);
+ break;
+
+ case INT:
+ if (schema.getLogicalType() instanceof LogicalTypes.Date) {
+ assertInstanceOf(DateWritable.class, writable);
+ } else {
+ assertInstanceOf(IntWritable.class, writable);
+ }
+ break;
+
+ case LONG:
+ assertInstanceOf(LongWritable.class, writable);
+ break;
+
+ case FLOAT:
+ assertInstanceOf(FloatWritable.class, writable);
+ break;
+
+ case DOUBLE:
+ assertInstanceOf(DoubleWritable.class, writable);
+ break;
+
+ case BYTES:
+ case ENUM:
+ assertInstanceOf(BytesWritable.class, writable);
+ break;
+
+ case STRING:
+ assertInstanceOf(Text.class, writable);
+ break;
+
+ case FIXED:
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ assertInstanceOf(HiveDecimalWritable.class, writable);
+ } else {
+ throw new IllegalStateException("Unexpected schema type: " + schema);
+ }
+ break;
+
+ default:
+ throw new IllegalStateException("Unexpected schema type: " + schema);
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index af0889c289b7..4d2679e9dc29 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.read;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -56,6 +57,8 @@ import org.apache.hudi.storage.StorageConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -90,6 +93,7 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -116,11 +120,25 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
public abstract String getCustomPayload();
- public abstract void commitToTable(List<HoodieRecord> recordList, String
operation,
- Map<String, String> writeConfigs);
+ public abstract void commitToTable(List<HoodieRecord> recordList,
+ String operation,
+ boolean firstCommit,
+ Map<String, String> writeConfigs,
+ String schemaStr);
+
+ public void commitToTable(List<HoodieRecord> recordList,
+ String operation,
+ boolean firstCommit,
+ Map<String, String> writeConfigs) {
+ commitToTable(recordList, operation, firstCommit, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
+ }
public abstract void assertRecordsEqual(Schema schema, T expected, T actual);
+ public abstract void assertRecordMatchesSchema(Schema schema, T record);
+
+ public abstract HoodieTestDataGenerator.SchemaEvolutionConfigs
getSchemaEvolutionConfigs();
+
private static Stream<Arguments> testArguments() {
return Stream.of(
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro", false),
@@ -141,7 +159,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a base file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, INSERT.value(), writeConfigs);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), true, 0, recordMergeMode,
initialRecords, initialRecords);
@@ -150,7 +168,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
List<HoodieRecord> unmergedRecords =
CollectionUtils.combine(initialRecords, updates);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, UPSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), true, 1, recordMergeMode,
allRecords, unmergedRecords);
@@ -158,7 +176,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
// Three commits; reading one file group containing a base file and two
log files
List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates("003", 100);
List<HoodieRecord> finalRecords = mergeRecordLists(updates2, allRecords);
- commitToTable(updates2, UPSERT.value(), writeConfigs);
+ commitToTable(updates2, UPSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), true, 2, recordMergeMode,
finalRecords, CollectionUtils.combine(unmergedRecords, updates2));
@@ -183,7 +201,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, INSERT.value(), writeConfigs);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
@@ -191,13 +209,251 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, INSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
}
}
+ private static List<Pair<String, IndexedRecord>>
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+ return hoodieRecords.stream().map(r -> {
+ try {
+ return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).filter(Option::isPresent).map(Option::get).map(r ->
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+ }
+
+ /**
+ * Write a base file with schema A, then write another base file with schema
B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> secondRecords =
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another base file with schema B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFileHasDifferentSchemaThanLogFiles()
throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 10, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ // Write a log file with schema A
+ List<HoodieRecord> secondRecords = dataGen.generateUniqueUpdates("002",
5);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, UPSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> thirdRecords =
dataGen.generateInsertsForPartition("003", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> thirdIndexedRecords =
hoodieRecordsToIndexedRecords(thirdRecords, dataGen.getExtendedSchema());
+ commitToTable(thirdRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ mergedRecords = CollectionUtils.combine(mergedRecords,
thirdIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ // use -1 to prevent validation of numlogfiles because one fg has a
log file but the other doesn't
+ true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another log file with schema B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenLogFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator baseFileDataGen =
+ new HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ baseFileDataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write base file with schema A
+ List<HoodieRecord> firstRecords = baseFileDataGen.generateInserts("001",
100);
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ // Write log file with schema A
+ List<HoodieRecord> secondRecords =
baseFileDataGen.generateUniqueUpdates("002", 50);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(secondRecords, UPSERT.value(), false, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+
+ // Evolve schema
+ baseFileDataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write log file with schema B
+ List<HoodieRecord> thirdRecords =
baseFileDataGen.generateUniqueUpdates("003", 50);
+ List<Pair<String, IndexedRecord>> thirdIndexedRecords =
hoodieRecordsToIndexedRecords(thirdRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(thirdRecords, UPSERT.value(), false, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ mergedRecords = mergeIndexedRecordLists(thirdIndexedRecords,
mergedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 2, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another log file with schema B. Then write a different base file
with schema C.
+ */
+ @Test
+ public void
testSchemaEvolutionWhenLogFilesWithDifferentSchemaAndTableSchemaDiffers()
throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator baseFileDataGen =
+ new HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ baseFileDataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write base file with schema A
+ List<HoodieRecord> firstRecords = baseFileDataGen.generateInserts("001",
100);
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ // Write log file with schema A
+ List<HoodieRecord> secondRecords =
baseFileDataGen.generateUniqueUpdates("002", 50);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(secondRecords, UPSERT.value(), false, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+
+ // Evolve schema
+ HoodieTestDataGenerator.SchemaEvolutionConfigs schemaEvolutionConfigs =
getSchemaEvolutionConfigs();
+ boolean addNewFieldSupport = schemaEvolutionConfigs.addNewFieldSupport;
+ schemaEvolutionConfigs.addNewFieldSupport = false;
+ baseFileDataGen.extendSchemaAfterEvolution(schemaEvolutionConfigs);
+
+ // Write log file with schema B
+ List<HoodieRecord> thirdRecords =
baseFileDataGen.generateUniqueUpdates("003", 50);
+ List<Pair<String, IndexedRecord>> thirdIndexedRecords =
hoodieRecordsToIndexedRecords(thirdRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(thirdRecords, UPSERT.value(), false, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ mergedRecords = mergeIndexedRecordLists(thirdIndexedRecords,
mergedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 2, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+
+ // Evolve schema again
+ schemaEvolutionConfigs = getSchemaEvolutionConfigs();
+ schemaEvolutionConfigs.addNewFieldSupport = addNewFieldSupport;
+ baseFileDataGen.extendSchemaAfterEvolution(schemaEvolutionConfigs);
+
+ // Write another base file with schema C
+ List<HoodieRecord> fourthRecords =
baseFileDataGen.generateInsertsForPartition("004", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> fourthIndexedRecords =
hoodieRecordsToIndexedRecords(fourthRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(fourthRecords, INSERT.value(), false, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ mergedRecords = CollectionUtils.combine(mergedRecords,
fourthIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ // use -1 to prevent validation of numlogfiles because one fg has
log files but the other doesn't
+ true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema B
+ */
+ @Test
+ public void
testSchemaEvolutionWhenBaseFilesWithDifferentSchemaFromLogFiles() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator baseFileDataGen =
+ new HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ baseFileDataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write base file with schema A
+ List<HoodieRecord> firstRecords = baseFileDataGen.generateInserts("001",
100);
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ //Evolve schema
+ baseFileDataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write log file with schema B
+ List<HoodieRecord> secondRecords =
baseFileDataGen.generateUniqueUpdates("002", 50);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(secondRecords, UPSERT.value(), false, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+ }
+ }
+
@Test
public void testReadFileGroupInBootstrapMergeOnReadTable() throws Exception {
Path zipOutput = Paths.get(new URI(getBasePath()));
@@ -219,7 +475,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType
diskMapType) throws Exception {
Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING, true));
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
- commitToTable(dataGen.generateInserts("001", 100), INSERT.value(),
writeConfigs);
+ commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), true,
writeConfigs);
String baseMapPath = Files.createTempDirectory(null).toString();
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
@@ -285,6 +541,52 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
return configMapping;
}
+ private void
validateOutputFromFileGroupReaderWithNativeRecords(StorageConfiguration<?>
storageConf,
+ String
tablePath,
+ boolean
containsBaseFile,
+ int
expectedLogFileNum,
+
RecordMergeMode recordMergeMode,
+
List<Pair<String, IndexedRecord>> expectedRecords) throws Exception {
+ Set<String> metaCols = new HashSet<>(HoodieRecord.HOODIE_META_COLUMNS);
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
+ TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+ Schema avroSchema = resolver.getTableAvroSchema();
+ Schema avroSchemaWithoutMeta = resolver.getTableAvroSchema(false);
+ // use reader context for conversion to engine specific objects
+ HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath,
avroSchema, getStorageConf(), metaClient);
+ List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath,
metaClient, containsBaseFile, expectedLogFileNum);
+ boolean sortOutput = !containsBaseFile;
+ List<T> actualRecordList =
+ readRecordsFromFileGroup(storageConf, tablePath, metaClient,
fileSlices, avroSchema, recordMergeMode, false, sortOutput);
+ assertEquals(expectedRecords.size(), actualRecordList.size());
+ actualRecordList.forEach(r -> assertRecordMatchesSchema(avroSchema, r));
+ Set<GenericRecord> actualRecordSet = actualRecordList.stream().map(r ->
readerContext.convertToAvroRecord(r, avroSchema))
+ .map(r -> HoodieAvroUtils.removeFields(r, metaCols))
+ .collect(Collectors.toSet());
+ Set<GenericRecord> expectedRecordSet = expectedRecords.stream()
+ .map(r -> (GenericRecord) r.getRight())
+ .map(r -> HoodieAvroUtils.rewriteRecordWithNewSchema(r,
avroSchemaWithoutMeta))
+ .collect(Collectors.toSet());
+ compareRecordSets(expectedRecordSet, actualRecordSet);
+ }
+
+ private void compareRecordSets(Set<GenericRecord> expectedRecordSet,
Set<GenericRecord> actualRecordSet) {
+ Map<String, GenericRecord> expectedMap = new
HashMap<>(expectedRecordSet.size());
+ for (GenericRecord expectedRecord : expectedRecordSet) {
+ expectedMap.put(expectedRecord.get("_row_key").toString(),
expectedRecord);
+ }
+ Map<String, GenericRecord> actualMap = new
HashMap<>(actualRecordSet.size());
+ for (GenericRecord actualRecord : actualRecordSet) {
+ actualMap.put(actualRecord.get("_row_key").toString(), actualRecord);
+ }
+ assertEquals(expectedMap.keySet(), actualMap.keySet());
+ for (String key : actualMap.keySet()) {
+ GenericRecord expectedRecord = expectedMap.get(key);
+ GenericRecord actualRecord = actualMap.get(key);
+ assertEquals(expectedRecord, actualRecord);
+ }
+ }
+
protected void validateOutputFromFileGroupReader(StorageConfiguration<?>
storageConf,
String tablePath,
boolean containsBaseFile,
@@ -364,7 +666,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
baseFile.setBootstrapBaseFile(new BaseFile(newBootstrapPath));
}
List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
- assertEquals(expectedLogFileNum, logFilePathList.size());
+ if (expectedLogFileNum >= 0) {
+ assertEquals(expectedLogFileNum, logFilePathList.size());
+ }
assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
});
return fileSlices;
@@ -397,7 +701,11 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
return actualRecordList;
}
- private HoodieFileGroupReader<T>
getHoodieFileGroupReader(StorageConfiguration<?> storageConf, String tablePath,
HoodieTableMetaClient metaClient, Schema avroSchema, FileSlice fileSlice,
+ private HoodieFileGroupReader<T>
getHoodieFileGroupReader(StorageConfiguration<?> storageConf,
+ String tablePath,
+
HoodieTableMetaClient metaClient,
+ Schema avroSchema,
+ FileSlice
fileSlice,
int start,
TypedProperties props, boolean sortOutput) {
return HoodieFileGroupReader.<T>newBuilder()
.withReaderContext(getHoodieReaderContext(tablePath, avroSchema,
storageConf, metaClient))
@@ -488,6 +796,12 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
return false;
}
+ protected List<Pair<String, IndexedRecord>>
mergeIndexedRecordLists(List<Pair<String, IndexedRecord>> updates,
List<Pair<String, IndexedRecord>> existing) {
+ Set<String> updatedKeys =
updates.stream().map(Pair::getLeft).collect(Collectors.toSet());
+ return Stream.concat(updates.stream(), existing.stream().filter(record ->
!updatedKeys.contains(record.getLeft())))
+ .collect(Collectors.toList());
+ }
+
protected List<HoodieRecord> mergeRecordLists(List<HoodieRecord> updates,
List<HoodieRecord> existing) {
Set<String> updatedKeys =
updates.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
return Stream.concat(updates.stream(), existing.stream().filter(record ->
!updatedKeys.contains(record.getRecordKey())))
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 3549a0ba3e83..97810a21d9f0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.testutils;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -78,6 +79,7 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -184,6 +186,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final Schema AVRO_TRIP_ENCODED_DECIMAL_SCHEMA = new
Schema.Parser().parse(TRIP_ENCODED_DECIMAL_SCHEMA);
public static final Schema AVRO_TRIP_SCHEMA = new
Schema.Parser().parse(TRIP_SCHEMA);
public static final Schema FLATTENED_AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
+
private final Random rand;
//Maintains all the existing keys schema wise
@@ -191,6 +194,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
private final String[] partitionPaths;
//maintains the count of existing keys schema wise
private Map<String, Integer> numKeysBySchema;
+ private Option<Schema> extendedSchema = Option.empty();
public HoodieTestDataGenerator(long seed) {
this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
@@ -356,7 +360,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
GenericRecord rec = generateGenericRecord(
key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime,
"driver-" + instantTime, timestamp,
false, isFlattened);
- return new RawTripTestPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
+ return new RawTripTestPayload(rec.toString(), key.getRecordKey(),
key.getPartitionPath(),
extendedSchema.map(Schema::toString).orElse(TRIP_EXAMPLE_SCHEMA));
}
private RawTripTestPayload generateNestedExampleRandomValue(
@@ -471,9 +475,12 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
* Populate rec with values for FARE_NESTED_SCHEMA
*/
private void generateFareNestedValues(GenericRecord rec) {
- GenericRecord fareRecord = new
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
+ GenericRecord fareRecord = new
GenericData.Record(extendedSchema.orElse(AVRO_SCHEMA).getField("fare").schema());
fareRecord.put("amount", rand.nextDouble() * 100);
fareRecord.put("currency", "USD");
+ if (extendedSchema.isPresent()) {
+ generateCustomValues(fareRecord, "customFare");
+ }
rec.put("fare", fareRecord);
}
@@ -481,6 +488,12 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
* Populate rec with values for TIP_NESTED_SCHEMA
*/
private void generateTipNestedValues(GenericRecord rec) {
+ // TODO [HUDI-9603] remove this check
+ if (extendedSchema.isPresent()) {
+ if (extendedSchema.get().getField("tip_history") == null) {
+ return;
+ }
+ }
GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1,
AVRO_SCHEMA.getField("tip_history").schema());
Schema tipSchema = new
Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
GenericRecord tipRecord = new GenericData.Record(tipSchema);
@@ -507,7 +520,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public GenericRecord generateGenericRecord(String rowKey, String
partitionPath, String riderName, String driverName,
long timestamp, boolean
isDeleteRecord,
boolean isFlattened) {
- GenericRecord rec = new GenericData.Record(isFlattened ?
FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
+ GenericRecord rec = new GenericData.Record(extendedSchema.orElseGet(() ->
isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA));
generateTripPrefixValues(rec, rowKey, partitionPath, riderName,
driverName, timestamp);
if (isFlattened) {
generateFareFlattenedValues(rec);
@@ -517,6 +530,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
generateFareNestedValues(rec);
generateTipNestedValues(rec);
}
+ generateCustomValues(rec, "customField");
generateTripSuffixValues(rec, isDeleteRecord);
return rec;
}
@@ -524,7 +538,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
/**
* Generate record conforming to TRIP_NESTED_EXAMPLE_SCHEMA
*/
- public GenericRecord generateNestedExampleGenericRecord(String rowKey,
String partitionPath, String riderName, String driverName,
+ public GenericRecord generateNestedExampleGenericRecord(String rowKey,
String partitionPath, String riderName, String driverName,
long timestamp,
boolean isDeleteRecord) {
GenericRecord rec = new GenericData.Record(NESTED_AVRO_SCHEMA);
generateTripPrefixValues(rec, rowKey, partitionPath, riderName,
driverName, timestamp);
@@ -1276,5 +1290,221 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
public String getRiderValue() {
return riderValue;
}
+
+ @Override
+ public String toString() {
+ return "RowKey: " + recordKey + ", PartitionPath: " + partitionPath
+ + ", OrderingVal: " + orderingVal + ", RiderValue: " + riderValue;
+ }
+ }
+
+ public static class SchemaEvolutionConfigs {
+ public Schema schema = AVRO_SCHEMA;
+ public boolean nestedSupport = true;
+ public boolean mapSupport = true;
+ public boolean arraySupport = true;
+ public boolean addNewFieldSupport = true;
+ // TODO: [HUDI-9603] Flink 1.18 array values incorrect in fg reader test
+ public boolean anyArraySupport = true;
+
+ // Int
+ public boolean intToLongSupport = true;
+ public boolean intToFloatSupport = true;
+ public boolean intToDoubleSupport = true;
+ public boolean intToStringSupport = true;
+
+ // Long
+ public boolean longToFloatSupport = true;
+ public boolean longToDoubleSupport = true;
+ public boolean longToStringSupport = true;
+
+ // Float
+ public boolean floatToDoubleSupport = true;
+ public boolean floatToStringSupport = true;
+
+ // Double
+ public boolean doubleToStringSupport = true;
+
+ // String
+ public boolean stringToBytesSupport = true;
+
+ // Bytes
+ public boolean bytesToStringSupport = true;
+ }
+
+ private enum SchemaEvolutionTypePromotionCase {
+ INT_TO_INT(Schema.Type.INT, Schema.Type.INT, config -> true),
+ INT_TO_LONG(Schema.Type.INT, Schema.Type.LONG, config ->
config.intToLongSupport),
+ INT_TO_FLOAT(Schema.Type.INT, Schema.Type.FLOAT, config ->
config.intToFloatSupport),
+ INT_TO_DOUBLE(Schema.Type.INT, Schema.Type.DOUBLE, config ->
config.intToDoubleSupport),
+ INT_TO_STRING(Schema.Type.INT, Schema.Type.STRING, config ->
config.intToStringSupport),
+ LONG_TO_LONG(Schema.Type.LONG, Schema.Type.LONG, config -> true),
+ LONG_TO_FLOAT(Schema.Type.LONG, Schema.Type.FLOAT, config ->
config.longToFloatSupport),
+ LONG_TO_DOUBLE(Schema.Type.LONG, Schema.Type.DOUBLE, config ->
config.longToDoubleSupport),
+ LONG_TO_STRING(Schema.Type.LONG, Schema.Type.STRING, config ->
config.longToStringSupport),
+ FLOAT_TO_FLOAT(Schema.Type.FLOAT, Schema.Type.FLOAT, config -> true),
+ FLOAT_TO_DOUBLE(Schema.Type.FLOAT, Schema.Type.DOUBLE, config ->
config.floatToDoubleSupport),
+ FLOAT_TO_STRING(Schema.Type.FLOAT, Schema.Type.STRING, config ->
config.floatToStringSupport),
+ DOUBLE_TO_DOUBLE(Schema.Type.DOUBLE, Schema.Type.DOUBLE, config -> true),
+ DOUBLE_TO_STRING(Schema.Type.DOUBLE, Schema.Type.STRING, config ->
config.doubleToStringSupport),
+ STRING_TO_STRING(Schema.Type.STRING, Schema.Type.STRING, config -> true),
+ STRING_TO_BYTES(Schema.Type.STRING, Schema.Type.BYTES, config ->
config.stringToBytesSupport),
+ BYTES_TO_BYTES(Schema.Type.BYTES, Schema.Type.BYTES, config -> true),
+ BYTES_TO_STRING(Schema.Type.BYTES, Schema.Type.STRING, config ->
config.bytesToStringSupport);
+
+ public final Schema.Type before;
+ public final Schema.Type after;
+ public final Predicate<SchemaEvolutionConfigs> isEnabled;
+
+ SchemaEvolutionTypePromotionCase(Schema.Type before, Schema.Type after,
Predicate<SchemaEvolutionConfigs> isEnabled) {
+ this.before = before;
+ this.after = after;
+ this.isEnabled = isEnabled;
+ }
+ }
+
+ public void extendSchema(SchemaEvolutionConfigs configs, boolean isBefore) {
+ List<Schema.Type> baseFields = new ArrayList<>();
+ for (SchemaEvolutionTypePromotionCase evolution :
SchemaEvolutionTypePromotionCase.values()) {
+ if (evolution.isEnabled.test(configs)) {
+ baseFields.add(isBefore ? evolution.before : evolution.after);
+ }
+ }
+
+ // Add new field if we are testing adding new fields
+ if (!isBefore && configs.addNewFieldSupport) {
+ baseFields.add(Schema.Type.BOOLEAN);
+ }
+
+ this.extendedSchema = Option.of(generateExtendedSchema(configs, new
ArrayList<>(baseFields)));
+ }
+
+ public void extendSchemaBeforeEvolution(SchemaEvolutionConfigs configs) {
+ extendSchema(configs, true);
+ }
+
+ public void extendSchemaAfterEvolution(SchemaEvolutionConfigs configs) {
+ extendSchema(configs, false);
+ }
+
+ public Schema getExtendedSchema() {
+ return extendedSchema.orElseThrow(IllegalArgumentException::new);
+ }
+
+ private static Schema generateExtendedSchema(SchemaEvolutionConfigs configs,
List<Schema.Type> baseFields) {
+ return generateExtendedSchema(configs.schema, configs, baseFields,
"customField", true);
+ }
+
+ private static Schema generateExtendedSchema(Schema baseSchema,
SchemaEvolutionConfigs configs, List<Schema.Type> baseFields, String
fieldPrefix, boolean toplevel) {
+ List<Schema.Field> fields = baseSchema.getFields();
+ List<Schema.Field> finalFields = new ArrayList<>(fields.size() +
baseFields.size());
+ boolean addedFields = false;
+ for (Schema.Field field : fields) {
+ if (configs.nestedSupport && field.name().equals("fare") &&
field.schema().getType() == Schema.Type.RECORD) {
+ finalFields.add(new Schema.Field(field.name(),
generateExtendedSchema(field.schema(), configs, baseFields, "customFare",
false), field.doc(), field.defaultVal()));
+ } else if (configs.anyArraySupport ||
!field.name().equals("tip_history")) {
+ //TODO: [HUDI-9603] remove the if condition when the issue is fixed
+ if (field.name().equals("_hoodie_is_deleted")) {
+ addedFields = true;
+ addFields(configs, finalFields, baseFields, fieldPrefix,
baseSchema.getNamespace(), toplevel);
+ }
+ finalFields.add(new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultVal()));
+ }
+ }
+ if (!addedFields) {
+ addFields(configs, finalFields, baseFields, fieldPrefix,
baseSchema.getNamespace(), toplevel);
+ }
+ Schema finalSchema = Schema.createRecord(baseSchema.getName(),
baseSchema.getDoc(),
+ baseSchema.getNamespace(), baseSchema.isError());
+ finalSchema.setFields(finalFields);
+ return finalSchema;
+ }
+
+ private static void addFields(SchemaEvolutionConfigs configs,
List<Schema.Field> finalFields, List<Schema.Type> baseFields, String
fieldPrefix, String namespace, boolean toplevel) {
+ if (toplevel) {
+ if (configs.mapSupport) {
+ List<Schema.Field> mapFields = new ArrayList<>(baseFields.size());
+ addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map");
+ finalFields.add(new Schema.Field(fieldPrefix + "Map",
Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false,
mapFields)), "", null));
+ }
+
+ if (configs.arraySupport) {
+ List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size());
+ addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array");
+ finalFields.add(new Schema.Field(fieldPrefix + "Array",
Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace,
false, arrayFields)), "", null));
+ }
+ }
+ addFieldsHelper(finalFields, baseFields, fieldPrefix);
+ }
+
+ private static void addFieldsHelper(List<Schema.Field> finalFields,
List<Schema.Type> baseFields, String fieldPrefix) {
+ for (int i = 0; i < baseFields.size(); i++) {
+ if (baseFields.get(i) == Schema.Type.BOOLEAN) {
+ // boolean fields are added fields
+ finalFields.add(new Schema.Field(fieldPrefix + i,
AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null));
+ } else {
+ finalFields.add(new Schema.Field(fieldPrefix + i,
Schema.create(baseFields.get(i)), "", null));
+ }
+ }
+ }
+
+ private void generateCustomValues(GenericRecord rec, String customPrefix) {
+ for (Schema.Field field : rec.getSchema().getFields()) {
+ if (field.name().startsWith(customPrefix)) {
+ switch (field.schema().getType()) {
+ case INT:
+ rec.put(field.name(), rand.nextInt());
+ break;
+ case LONG:
+ rec.put(field.name(), rand.nextLong());
+ break;
+ case FLOAT:
+ rec.put(field.name(), rand.nextFloat());
+ break;
+ case DOUBLE:
+ rec.put(field.name(), rand.nextDouble());
+ break;
+ case STRING:
+ rec.put(field.name(), genPseudoRandomUUID(rand).toString());
+ break;
+ case BYTES:
+ rec.put(field.name(),
ByteBuffer.wrap(getUTF8Bytes(genPseudoRandomUUID(rand).toString())));
+ break;
+ case UNION:
+ if
(!AvroSchemaUtils.resolveNullableSchema(field.schema()).getType().equals(Schema.Type.BOOLEAN))
{
+ throw new IllegalStateException("Union should only be boolean");
+ }
+ rec.put(field.name(), rand.nextBoolean());
+ break;
+ case BOOLEAN:
+ rec.put(field.name(), rand.nextBoolean());
+ break;
+ case MAP:
+ rec.put(field.name(), genMap(field.schema(), field.name()));
+ break;
+ case ARRAY:
+ rec.put(field.name(), genArray(field.schema(), field.name()));
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
field.schema().getType());
+ }
+ }
+ }
+ }
+
+ private GenericArray<GenericRecord> genArray(Schema arraySchema, String
customPrefix) {
+ GenericArray<GenericRecord> customArray = new GenericData.Array<>(1,
arraySchema);
+ Schema arrayElementSchema = arraySchema.getElementType();
+ GenericRecord customRecord = new GenericData.Record(arrayElementSchema);
+ generateCustomValues(customRecord, customPrefix);
+ customArray.add(customRecord);
+ return customArray;
+ }
+
+ private Map<String,GenericRecord> genMap(Schema mapSchema, String
customPrefix) {
+ Schema mapElementSchema = mapSchema.getValueType();
+ GenericRecord customRecord = new GenericData.Record(mapElementSchema);
+ generateCustomValues(customRecord, customPrefix);
+ return Collections.singletonMap("customMapKey", customRecord);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 6f6b19f2444b..5ac306937996 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -81,15 +81,11 @@ import static org.mockito.Mockito.when;
public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderBase<RowData> {
private Configuration conf;
private Option<InstantRange> instantRangeOpt = Option.empty();
- private static final Schema RECORD_SCHEMA = getRecordAvroSchema();
- private static final AvroToRowDataConverters.AvroToRowDataConverter
AVRO_CONVERTER =
-
RowDataAvroQueryContexts.fromAvroSchema(RECORD_SCHEMA).getAvroToRowDataConverter();
@BeforeEach
public void setup() {
conf = new Configuration();
conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
- conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, RECORD_SCHEMA.toString());
conf.set(FlinkOptions.PATH, getBasePath());
conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
// use hive style partition as a workaround for HUDI-9396
@@ -142,12 +138,16 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
}
@Override
- public void commitToTable(List<HoodieRecord> recordList, String operation,
Map<String, String> writeConfigs) {
+ public void commitToTable(List<HoodieRecord> recordList, String operation,
boolean firstCommit, Map<String, String> writeConfigs, String schemaStr) {
writeConfigs.forEach((key, value) -> conf.setString(key, value));
conf.set(FlinkOptions.OPERATION, operation);
+ Schema localSchema = getRecordAvroSchema(schemaStr);
+ conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, localSchema.toString());
+ AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
+
RowDataAvroQueryContexts.fromAvroSchema(localSchema).getAvroToRowDataConverter();
List<RowData> rowDataList = recordList.stream().map(record -> {
try {
- return (RowData)
AVRO_CONVERTER.convert(record.toIndexedRecord(RECORD_SCHEMA,
CollectionUtils.emptyProps()).get().getData());
+ return (RowData)
avroConverter.convert(record.toIndexedRecord(localSchema,
CollectionUtils.emptyProps()).get().getData());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -168,6 +168,34 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
RowDataAvroQueryContexts.fromAvroSchema(schema).getRowType());
}
+ @Override
+ public void assertRecordMatchesSchema(Schema schema, RowData record) {
+ // TODO: Add support for RowData
+ }
+
+ @Override
+ public HoodieTestDataGenerator.SchemaEvolutionConfigs
getSchemaEvolutionConfigs() {
+ HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new
HoodieTestDataGenerator.SchemaEvolutionConfigs();
+ configs.nestedSupport = false;
+ configs.arraySupport = false;
+ configs.mapSupport = false;
+ configs.anyArraySupport = false;
+ configs.addNewFieldSupport = false;
+ configs.intToLongSupport = false;
+ configs.intToFloatSupport = false;
+ configs.intToDoubleSupport = false;
+ configs.intToStringSupport = false;
+ configs.longToFloatSupport = false;
+ configs.longToDoubleSupport = false;
+ configs.longToStringSupport = false;
+ configs.floatToDoubleSupport = false;
+ configs.floatToStringSupport = false;
+ configs.doubleToStringSupport = false;
+ configs.stringToBytesSupport = false;
+ configs.bytesToStringSupport = false;
+ return configs;
+ }
+
@Test
public void testGetOrderingValue() {
HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
@@ -250,7 +278,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, UPSERT.value(), writeConfigs);
+ commitToTable(initialRecords, UPSERT.value(), true, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
@@ -258,7 +286,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, UPSERT.value(), false, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
@@ -275,7 +303,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, firstCommitOperation.value(),
writeConfigs);
+ commitToTable(initialRecords, firstCommitOperation.value(), true,
writeConfigs, TRIP_EXAMPLE_SCHEMA);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), isFirstCommitInsert,
isFirstCommitInsert ? 0 : 1, recordMergeMode, initialRecords,
initialRecords);
@@ -286,7 +314,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
// the base/log file of the first commit is filtered out by the instant
range.
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, UPSERT.value(), false, writeConfigs,
TRIP_EXAMPLE_SCHEMA);
validateOutputFromFileGroupReader(getStorageConf(), getBasePath(),
isFirstCommitInsert, isFirstCommitInsert ? 1 : 2, recordMergeMode,
updates, updates);
// reset instant range
@@ -294,8 +322,8 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
}
}
- private static Schema getRecordAvroSchema() {
- Schema recordSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
+ private static Schema getRecordAvroSchema(String schemaStr) {
+ Schema recordSchema = new Schema.Parser().parse(schemaStr);
return
AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
}
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java
new file mode 100644
index 000000000000..5022f0cef926
--- /dev/null
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.avro;
+
+import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.BYTES;
+import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FIXED;
+import static org.apache.avro.Schema.Type.FLOAT;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.avro.Schema.Type.STRING;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ * Taken from
https://github.com/apache/hive/blob/rel/release-2.3.4/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
+ */
+public class HiveTypeUtils {
+ // Conversion of Avro primitive types to Hive primitive types
+ // Avro Hive
+ // Null
+ // boolean boolean check
+ // int int check
+ // long bigint check
+ // float double check
+ // double double check
+ // bytes binary check
+ // fixed binary check
+ // string string check
+ // tinyint
+ // smallint
+
+ // Map of Avro's primitive types to Hives (for those that are supported by
both)
+ private static final Map<Schema.Type, TypeInfo> PRIMITIVE_TYPE_TO_TYPE_INFO
= initTypeMap();
+ private static Map<Schema.Type, TypeInfo> initTypeMap() {
+ Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
+ theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
+ theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
+ theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
+ theMap.put(LONG, TypeInfoFactory.getPrimitiveTypeInfo("bigint"));
+ theMap.put(FLOAT, TypeInfoFactory.getPrimitiveTypeInfo("float"));
+ theMap.put(DOUBLE, TypeInfoFactory.getPrimitiveTypeInfo("double"));
+ theMap.put(BYTES, TypeInfoFactory.getPrimitiveTypeInfo("binary"));
+ theMap.put(FIXED, TypeInfoFactory.getPrimitiveTypeInfo("binary"));
+ theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string"));
+ return Collections.unmodifiableMap(theMap);
+ }
+
+ /**
+ * Generate a list of of TypeInfos from an Avro schema. This method is
+ * currently public due to some weirdness in deserializing unions, but
+ * will be made private once that is resolved.
+ * @param schema Schema to generate field types for
+ * @return List of TypeInfos, each element of which is a TypeInfo derived
+ * from the schema.
+ * @throws AvroSerdeException for problems during conversion.
+ */
+ public static List<TypeInfo> generateColumnTypes(Schema schema) throws
AvroSerdeException {
+ return generateColumnTypes(schema, null);
+ }
+
+ /**
+ * Generate a list of of TypeInfos from an Avro schema. This method is
+ * currently public due to some weirdness in deserializing unions, but
+ * will be made private once that is resolved.
+ * @param schema Schema to generate field types for
+ * @param seenSchemas stores schemas processed in the parsing done so far,
+ * helping to resolve circular references in the schema
+ * @return List of TypeInfos, each element of which is a TypeInfo derived
+ * from the schema.
+ * @throws AvroSerdeException for problems during conversion.
+ */
+ public static List<TypeInfo> generateColumnTypes(Schema schema,
+ Set<Schema> seenSchemas)
throws AvroSerdeException {
+ List<Schema.Field> fields = schema.getFields();
+
+ List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
+
+ for (Schema.Field field : fields) {
+ types.add(generateTypeInfo(field.schema(), seenSchemas));
+ }
+
+ return types;
+ }
+
+ static InstanceCache<Schema, TypeInfo> typeInfoCache = new
InstanceCache<Schema, TypeInfo>() {
+ @Override
+ protected TypeInfo makeInstance(Schema s,
+ Set<Schema> seenSchemas)
+ throws AvroSerdeException {
+ return generateTypeInfoWorker(s, seenSchemas);
+ }
+ };
+ /**
+ * Convert an Avro Schema into an equivalent Hive TypeInfo.
+ * @param schema to record. Must be of record type.
+ * @param seenSchemas stores schemas processed in the parsing done so far,
+ * helping to resolve circular references in the schema
+ * @return TypeInfo matching the Avro schema
+ * @throws AvroSerdeException for any problems during conversion.
+ */
+ public static TypeInfo generateTypeInfo(Schema schema,
+ Set<Schema> seenSchemas) throws
AvroSerdeException {
+ // For bytes type, it can be mapped to decimal.
+ Schema.Type type = schema.getType();
+ // HUDI MODIFICATION ADDED "|| type == FIXED"
+ if ((type == BYTES || type == FIXED) && AvroSerDe.DECIMAL_TYPE_NAME
+ .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+ int precision = 0;
+ int scale = 0;
+ try {
+ precision =
getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_PRECISION));
+ scale = getIntValue(schema.getObjectProp(AvroSerDe.AVRO_PROP_SCALE));
+ } catch (Exception ex) {
+ throw new AvroSerdeException("Failed to obtain scale value from file
schema: " + schema, ex);
+ }
+
+ try {
+ HiveDecimalUtils.validateParameter(precision, scale);
+ } catch (Exception ex) {
+ throw new AvroSerdeException("Invalid precision or scale for decimal
type", ex);
+ }
+
+ return TypeInfoFactory.getDecimalTypeInfo(precision, scale);
+ }
+
+ if (type == STRING
+ &&
AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
{
+ int maxLength = 0;
+ try {
+ maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
+ } catch (Exception ex) {
+ throw new AvroSerdeException("Failed to obtain maxLength value from
file schema: " + schema, ex);
+ }
+ return TypeInfoFactory.getCharTypeInfo(maxLength);
+ }
+
+ if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME
+ .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+ int maxLength = 0;
+ try {
+ maxLength = getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
+ } catch (Exception ex) {
+ throw new AvroSerdeException("Failed to obtain maxLength value from
file schema: " + schema, ex);
+ }
+ return TypeInfoFactory.getVarcharTypeInfo(maxLength);
+ }
+
+ if (type == INT
+ &&
AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
{
+ return TypeInfoFactory.dateTypeInfo;
+ }
+
+ if (type == LONG
+ &&
AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE)))
{
+ return TypeInfoFactory.timestampTypeInfo;
+ }
+
+ return typeInfoCache.retrieve(schema, seenSchemas);
+ }
+
+ private static boolean isEmpty(final CharSequence cs) {
+ return cs == null || cs.length() == 0;
+ }
+
+ // added this from StringUtils
+ private static boolean isNumeric(final CharSequence cs) {
+ if (isEmpty(cs)) {
+ return false;
+ }
+ final int sz = cs.length();
+ for (int i = 0; i < sz; i++) {
+ if (!Character.isDigit(cs.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // added this from hive latest
+ private static int getIntValue(Object obj) {
+ int value = 0;
+ if (obj instanceof Integer) {
+ value = (int) obj;
+ } else if (obj instanceof String && isNumeric((String)obj)) {
+ value = Integer.parseInt((String)obj);
+ }
+ return value;
+ }
+
+ // added this from AvroSerdeUtils in hive latest
+ public static int getIntFromSchema(Schema schema, String name) {
+ Object obj = schema.getObjectProp(name);
+ if (obj instanceof String) {
+ return Integer.parseInt((String) obj);
+ } else if (obj instanceof Integer) {
+ return (int) obj;
+ } else {
+ throw new IllegalArgumentException("Expect integer or string value from
property " + name
+ + " but found type " + obj.getClass().getName());
+ }
+ }
+
+ private static TypeInfo generateTypeInfoWorker(Schema schema,
+ Set<Schema> seenSchemas)
throws AvroSerdeException {
+ // Avro requires NULLable types to be defined as unions of some type T
+ // and NULL. This is annoying and we're going to hide it from the user.
+ if (AvroSerdeUtils.isNullableType(schema)) {
+ return generateTypeInfo(
+ AvroSerdeUtils.getOtherTypeFromNullableType(schema), seenSchemas);
+ }
+
+ Schema.Type type = schema.getType();
+ if (PRIMITIVE_TYPE_TO_TYPE_INFO.containsKey(type)) {
+ return PRIMITIVE_TYPE_TO_TYPE_INFO.get(type);
+ }
+
+ switch (type) {
+ case RECORD: return generateRecordTypeInfo(schema, seenSchemas);
+ case MAP: return generateMapTypeInfo(schema, seenSchemas);
+ case ARRAY: return generateArrayTypeInfo(schema, seenSchemas);
+ case UNION: return generateUnionTypeInfo(schema, seenSchemas);
+ case ENUM: return generateEnumTypeInfo(schema);
+ default: throw new AvroSerdeException("Do not yet support: " +
schema);
+ }
+ }
+
+ private static TypeInfo generateRecordTypeInfo(Schema schema,
+ Set<Schema> seenSchemas)
throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.RECORD);
+
+ if (seenSchemas == null) {
+ seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema,
Boolean>());
+ } else if (seenSchemas.contains(schema)) {
+ throw new AvroSerdeException(
+ "Recursive schemas are not supported. Recursive schema was " + schema
+ .getFullName());
+ }
+ seenSchemas.add(schema);
+
+ List<Schema.Field> fields = schema.getFields();
+ List<String> fieldNames = new ArrayList<String>(fields.size());
+ List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
+
+ for (int i = 0; i < fields.size(); i++) {
+ fieldNames.add(i, fields.get(i).name());
+ typeInfos.add(i, generateTypeInfo(fields.get(i).schema(), seenSchemas));
+ }
+
+ return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos);
+ }
+
+ /**
+ * Generate a TypeInfo for an Avro Map. This is made slightly simpler in
that
+ * Avro only allows maps with strings for keys.
+ */
+ private static TypeInfo generateMapTypeInfo(Schema schema,
+ Set<Schema> seenSchemas) throws
AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.MAP);
+ Schema valueType = schema.getValueType();
+ TypeInfo ti = generateTypeInfo(valueType, seenSchemas);
+
+ return
TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"),
ti);
+ }
+
+ private static TypeInfo generateArrayTypeInfo(Schema schema,
+ Set<Schema> seenSchemas)
throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.ARRAY);
+ Schema itemsType = schema.getElementType();
+ TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas);
+
+ return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
+ }
+
+ private static TypeInfo generateUnionTypeInfo(Schema schema,
+ Set<Schema> seenSchemas)
throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.UNION);
+ List<Schema> types = schema.getTypes();
+
+
+ List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
+
+ for (Schema type : types) {
+ typeInfos.add(generateTypeInfo(type, seenSchemas));
+ }
+
+ return TypeInfoFactory.getUnionTypeInfo(typeInfos);
+ }
+
+ // Hive doesn't have an Enum type, so we're going to treat them as Strings.
+ // During the deserialize/serialize stage we'll check for enumness and
+ // convert as such.
+ private static TypeInfo generateEnumTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.ENUM);
+
+ return TypeInfoFactory.getPrimitiveTypeInfo("string");
+ }
+}
\ No newline at end of file
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
index fd637490db8f..f743ad81e51f 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -101,7 +101,7 @@ public class ObjectInspectorCache {
public ArrayWritableObjectInspector getObjectInspector(Schema schema) {
return objectInspectorCache.computeIfAbsent(schema, s -> {
- List<String> columnNameList =
s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ List<String> columnNameList =
s.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
List<TypeInfo> columnTypeList =
columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList());
StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
return new ArrayWritableObjectInspector(rowTypeInfo);
@@ -110,7 +110,7 @@ public class ObjectInspectorCache {
public GenericRecord serialize(ArrayWritable record, Schema schema) {
return serializerCache.computeIfAbsent(schema, s -> {
- List<String> columnNameList =
s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ List<String> columnNameList =
s.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
List<TypeInfo> columnTypeList =
columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList());
StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
return new HiveAvroSerializer(new
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 9158961b40c9..a351665bf9e5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,29 +19,31 @@
package org.apache.hudi.common.table.read
-import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode,
TypedProperties}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY,
DELETE_MARKER}
-import org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import
org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark.getFileCount
-import org.apache.hudi.common.testutils.{HoodieTestUtils, RawTripTestPayload}
-import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
+import org.apache.hudi.common.util.{CollectionUtils, Option => HOption}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
-import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, Row, SaveMode,
SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader
-import org.apache.spark.sql.types.StructType
+import
org.apache.spark.sql.internal.SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
DataType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType,
StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -77,6 +79,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
sparkConf.set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
sparkConf.set("spark.sql.parquet.enableVectorizedReader", "false")
+ sparkConf.set(LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key,
"true")
HoodieSparkKryoRegistrar.register(sparkConf)
spark = SparkSession.builder.config(sparkConf).getOrCreate
}
@@ -101,17 +104,22 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty,
getStorageConf, metaClient.getTableConfig)
}
- override def commitToTable(recordList: util.List[HoodieRecord[_]],
operation: String, options: util.Map[String, String]): Unit = {
- val recs = RawTripTestPayload.recordsToStrings(recordList)
- val inputDF: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(recs.asScala.toList, 2))
+ override def commitToTable(recordList: util.List[HoodieRecord[_]],
+ operation: String,
+ firstCommit: Boolean,
+ options: util.Map[String, String],
+ schemaStr: String): Unit = {
+ val schema = new Schema.Parser().parse(schemaStr)
+ val genericRecords =
spark.sparkContext.parallelize(recordList.asScala.map(_.toIndexedRecord(schema,
CollectionUtils.emptyProps))
+ .filter(r => r.isPresent).map(r =>
r.get.getData.asInstanceOf[GenericRecord]).toSeq, 2)
+ val inputDF: Dataset[Row] =
AvroConversionUtils.createDataFrame(genericRecords, schemaStr, spark);
inputDF.write.format("hudi")
.options(options)
.option("hoodie.compact.inline", "false") // else fails due to
compaction & deltacommit instant times being same
.option("hoodie.datasource.write.operation", operation)
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
- .mode(if (operation.equalsIgnoreCase(WriteOperationType.INSERT.value()))
SaveMode.Overwrite
- else SaveMode.Append)
+ .mode(if (firstCommit) SaveMode.Overwrite else SaveMode.Append)
.save(getBasePath)
}
@@ -120,9 +128,46 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
override def assertRecordsEqual(schema: Schema, expected: InternalRow,
actual: InternalRow): Unit = {
assertEquals(expected.numFields, actual.numFields)
val expectedStruct =
sparkAdapter.getAvroSchemaConverters.toSqlType(schema)._1.asInstanceOf[StructType]
- expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).foreach(
converted => {
- assertEquals(converted._1, converted._2)
- })
+
+
expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).zipWithIndex.foreach
{
+ case ((v1, v2), i) =>
+ val fieldType = expectedStruct(i).dataType
+
+ (v1, v2, fieldType) match {
+ case (a1: Array[Byte], a2: Array[Byte], _) =>
+ assert(java.util.Arrays.equals(a1, a2), s"Mismatch at field $i:
expected ${a1.mkString(",")} but got ${a2.mkString(",")}")
+
+ case (m1: MapData, m2: MapData, MapType(keyType, valueType, _)) =>
+ val map1 = mapDataToScalaMap(m1, keyType, valueType)
+ val map2 = mapDataToScalaMap(m2, keyType, valueType)
+ assertEquals(map1, map2, s"Mismatch at field $i: maps not equal")
+
+ case _ =>
+ assertEquals(v1, v2, s"Mismatch at field $i")
+ }
+ }
+ }
+
+ def mapDataToScalaMap(mapData: MapData, keyType: DataType, valueType:
DataType): Map[Any, Any] = {
+ val keys = mapData.keyArray()
+ val values = mapData.valueArray()
+ (0 until mapData.numElements()).map { i =>
+ val k = extractValue(keys, i, keyType)
+ val v = extractValue(values, i, valueType)
+ k -> v
+ }.toMap
+ }
+
+ def extractValue(array: ArrayData, index: Int, dt: DataType): Any = dt match
{
+ case IntegerType => array.getInt(index)
+ case LongType => array.getLong(index)
+ case StringType => array.getUTF8String(index).toString
+ case DoubleType => array.getDouble(index)
+ case FloatType => array.getFloat(index)
+ case BooleanType => array.getBoolean(index)
+ case BinaryType => array.getBinary(index)
+ // Extend this to support StructType, ArrayType, etc. if needed
+ case other => throw new UnsupportedOperationException(s"Unsupported
type: $other")
}
@Test
@@ -142,7 +187,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
testGetOrderingValue(
sparkReaderContext, row, avroSchema, "col3",
UTF8String.fromString("blue"))
testGetOrderingValue(
- sparkReaderContext, row, avroSchema, "non_existent_col",
DEFAULT_ORDERING_VALUE)
+ sparkReaderContext, row, avroSchema, "non_existent_col",
HoodieRecord.DEFAULT_ORDERING_VALUE)
}
val expectedEventTimeBased: Seq[(Int, String, String, String, Double,
String)] = Seq(
@@ -342,6 +387,48 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
))
schema
}
+
+ override def assertRecordMatchesSchema(schema: Schema, record: InternalRow):
Unit = {
+ val structType = HoodieInternalRowUtils.getCachedSchema(schema)
+ assertRecordMatchesSchema(structType, record)
+ }
+
+ private def assertRecordMatchesSchema(structType: StructType, record:
InternalRow): Unit = {
+ val values = record.toSeq(structType)
+ structType.zip(values).foreach { r =>
+ r._1.dataType match {
+ case struct: StructType => assertRecordMatchesSchema(struct,
r._2.asInstanceOf[InternalRow])
+ case array: ArrayType => assertArrayMatchesSchema(array.elementType,
r._2.asInstanceOf[ArrayData])
+ case map: MapType => asserMapMatchesSchema(map,
r._2.asInstanceOf[MapData])
+ case _ =>
+ }
+ }
+ }
+
+ private def assertArrayMatchesSchema(schema: DataType, array: ArrayData):
Unit = {
+ val arrayValues = array.toSeq[Any](schema)
+ schema match {
+ case structType: StructType =>
+ arrayValues.foreach(v => assertRecordMatchesSchema(structType,
v.asInstanceOf[InternalRow]))
+ case arrayType: ArrayType =>
+ arrayValues.foreach(v =>
assertArrayMatchesSchema(arrayType.elementType, v.asInstanceOf[ArrayData]))
+ case mapType: MapType =>
+ arrayValues.foreach(v => asserMapMatchesSchema(mapType,
v.asInstanceOf[MapData]))
+ case _ =>
+ }
+ }
+
+ private def asserMapMatchesSchema(schema: MapType, map: MapData): Unit = {
+ assertArrayMatchesSchema(schema.keyType, map.keyArray())
+ assertArrayMatchesSchema(schema.valueType, map.valueArray())
+ }
+
+ override def getSchemaEvolutionConfigs:
HoodieTestDataGenerator.SchemaEvolutionConfigs = {
+ val configs = new HoodieTestDataGenerator.SchemaEvolutionConfigs()
+ configs.floatToDoubleSupport = false
+ configs
+ }
+
}
object TestHoodieFileGroupReaderOnSpark {